This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a06800e90b [Feature][Connector] Add Apache Cloudberry Support (#8985)
a06800e90b is described below

commit a06800e90b5685632ef788fd85aebd5c8fad2412
Author: chenhongyu <2795267...@qq.com>
AuthorDate: Mon Mar 24 10:54:09 2025 +0800

    [Feature][Connector] Add Apache Cloudberry Support (#8985)
---
 .../connector-v2/changelog/connector-cloudberry.md |   7 +
 docs/en/connector-v2/sink/Cloudberry.md            | 176 ++++++++++++++++++
 docs/en/connector-v2/source/Cloudberry.md          | 152 +++++++++++++++
 .../seatunnel/jdbc/JdbcCloudberryIT.java           | 206 +++++++++++++++++++++
 .../resources/jdbc_cloudberry_source_and_sink.conf |  57 ++++++
 5 files changed, 598 insertions(+)

diff --git a/docs/en/connector-v2/changelog/connector-cloudberry.md 
b/docs/en/connector-v2/changelog/connector-cloudberry.md
new file mode 100644
index 0000000000..749ae587cd
--- /dev/null
+++ b/docs/en/connector-v2/changelog/connector-cloudberry.md
@@ -0,0 +1,7 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+| --- | --- | --- |
+|[Feature][Connector] Add Apache Cloudberry Support 
(#8985)|https://github.com/apache/seatunnel/commit/b6f82c1|dev|
+
+</details>
diff --git a/docs/en/connector-v2/sink/Cloudberry.md 
b/docs/en/connector-v2/sink/Cloudberry.md
new file mode 100644
index 0000000000..c7e5b6c99a
--- /dev/null
+++ b/docs/en/connector-v2/sink/Cloudberry.md
@@ -0,0 +1,176 @@
+import ChangeLog from '../changelog/connector-cloudberry.md';
+
+# Cloudberry
+
+> JDBC Cloudberry  Sink Connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Description
+
+Write data through JDBC. Cloudberry currently does not have its own native 
driver. It uses PostgreSQL's driver for connectivity and follows PostgreSQL's 
implementation.
+
+Support Batch mode and Streaming mode, support concurrent writing, support 
exactly-once
+semantics (using XA transaction guarantee).
+
+## Using Dependency
+
+### For Spark/Flink Engine
+
+> 1. You need to ensure that the [jdbc driver jar 
package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been 
placed in directory `${SEATUNNEL_HOME}/plugins/`.
+
+### For SeaTunnel Zeta Engine
+
+> 1. You need to ensure that the [jdbc driver jar 
package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been 
placed in directory `${SEATUNNEL_HOME}/lib/`.
+
+## Key Features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+> Use `Xa transactions` to ensure `exactly-once`. So only support 
`exactly-once` for the database which is
+> support `Xa transactions`. You can set `is_exactly_once=true` to enable it.
+
+## Supported DataSource Info
+
+| Datasource |            Supported Versions            |        Driver        
 |                  Url                  |                                  
Maven                                   |
+|------------|------------------------------------------|------------------------|---------------------------------------|--------------------------------------------------------------------------|
+| Cloudberry | Uses PostgreSQL driver implementation | org.postgresql.Driver | 
jdbc:postgresql://localhost:5432/test | 
[Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
+
+## Database Dependency
+
+> Please download the PostgreSQL driver jar and copy it to the 
'$SEATUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example: cp postgresql-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+Cloudberry uses PostgreSQL's data type implementation. Please refer to 
PostgreSQL documentation for data type compatibility and mappings.
+
+## Options
+
+Cloudberry connector uses the same options as PostgreSQL. For detailed 
configuration options, please refer to the PostgreSQL documentation.
+
+Key options include:
+- url (required): The JDBC connection URL
+- driver (required): The driver class name (org.postgresql.Driver)
+- user/password: Authentication credentials
+- query or database/table combination: What data to write and how
+- is_exactly_once: Enable exactly-once semantics with XA transactions
+- batch_size: Control batch writing behavior
+
+## Task Example
+
+### Simple:
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    parallelism = 1
+    plugin_output = "fake"
+    row.num = 16
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+    driver = "org.postgresql.Driver"
+    user = "dbadmin"
+    password = "password"
+    query = "insert into test_table(name,age) values(?,?)"
+  }
+}
+```
+
+### Generate Sink SQL
+
+```hocon
+sink {
+  Jdbc {
+    url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+    driver = "org.postgresql.Driver"
+    user = "dbadmin"
+    password = "password"
+    
+    generate_sink_sql = true
+    database = "mydb"
+    table = "public.test_table"
+  }
+}
+```
+
+### Exactly-once:
+
+```hocon
+sink {
+  jdbc {
+    url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+    driver = "org.postgresql.Driver"
+    user = "dbadmin"
+    password = "password"
+    query = "insert into test_table(name,age) values(?,?)"
+    
+    is_exactly_once = "true"
+    xa_data_source_class_name = "org.postgresql.xa.PGXADataSource"
+  }
+}
+```
+
+### CDC(Change Data Capture) Event
+
+```hocon
+sink {
+  jdbc {
+    url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+    driver = "org.postgresql.Driver"
+    user = "dbadmin"
+    password = "password"
+    
+    generate_sink_sql = true
+    database = "mydb"
+    table = "sink_table"
+    primary_keys = ["id","name"]
+    field_ide = UPPERCASE
+  }
+}
+```
+
+### Save mode function
+
+```hocon
+sink {
+  Jdbc {
+    url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+    driver = "org.postgresql.Driver"
+    user = "dbadmin"
+    password = "password"
+    
+    generate_sink_sql = true
+    database = "mydb"
+    table = "public.test_table"
+    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+    data_save_mode = "APPEND_DATA"
+  }
+}
+```
+
+For more detailed examples and options, please refer to the PostgreSQL 
connector documentation.
+
+## Changelog
+
+<ChangeLog />
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/Cloudberry.md 
b/docs/en/connector-v2/source/Cloudberry.md
new file mode 100644
index 0000000000..80880d6f98
--- /dev/null
+++ b/docs/en/connector-v2/source/Cloudberry.md
@@ -0,0 +1,152 @@
+import ChangeLog from '../changelog/connector-cloudberry.md';
+
+# Cloudberry
+
+> JDBC Cloudberry Source Connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Using Dependency
+
+### For Spark/Flink Engine
+
+> 1. You need to ensure that the [jdbc driver jar 
package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been 
placed in directory `${SEATUNNEL_HOME}/plugins/`.
+
+### For SeaTunnel Zeta Engine
+
+> 1. You need to ensure that the [jdbc driver jar 
package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been 
placed in directory `${SEATUNNEL_HOME}/lib/`.
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+> supports query SQL and can achieve projection effect.
+
+## Description
+
+Read external data source data through JDBC. Cloudberry currently does not 
have its own native JDBC driver, using PostgreSQL's drivers and implementation.
+
+## Supported DataSource Info
+
+| Datasource |            Supported Versions            |        Driver        
 |                  Url                  |                                  
Maven                                   |
+|------------|------------------------------------------|------------------------|---------------------------------------|--------------------------------------------------------------------------|
+| Cloudberry | Uses PostgreSQL driver implementation | org.postgresql.Driver | 
jdbc:postgresql://localhost:5432/test | 
[Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
+
+## Database Dependency
+
+> Please download the PostgreSQL driver jar and copy it to the 
'$SEATUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example: cp postgresql-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+Cloudberry uses PostgreSQL's data type implementation. Please refer to 
PostgreSQL documentation for data type compatibility and mappings.
+
+## Options
+
+Cloudberry connector uses the same options as PostgreSQL. For detailed 
configuration options, please refer to the PostgreSQL documentation.
+
+Key options include:
+- url (required): The JDBC connection URL
+- driver (required): The driver class name (org.postgresql.Driver)
+- user/password: Authentication credentials
+- query or table_path: What data to read
+- partition options for parallel reading
+
+## Parallel Reader
+
+Cloudberry supports parallel reading following the same rules as PostgreSQL 
connector. For detailed information on split strategies and parallel reading 
options, please refer to the PostgreSQL connector documentation.
+
+## Task Example
+
+### Simple:
+
+```hocon
+env {
+  parallelism = 4
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+    driver = "org.postgresql.Driver"
+    user = "dbadmin"
+    password = "password"
+    query = "select * from mytable limit 100"
+  }
+}
+
+sink {
+  Console {}
+}
+```
+
+### Parallel reading with table_path:
+
+```hocon
+env {
+  parallelism = 4
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+    driver = "org.postgresql.Driver"
+    user = "dbadmin"
+    password = "password"
+    table_path = "public.mytable"
+    split.size = 10000
+  }
+}
+
+sink {
+  Console {}
+}
+```
+
+### Multiple table read:
+
+```hocon
+env {
+  job.mode = "BATCH"
+  parallelism = 4
+}
+
+source {
+  Jdbc {
+    url = "jdbc:postgresql://localhost:5432/cloudberrydb"
+    driver = "org.postgresql.Driver"
+    user = "dbadmin"
+    password = "password"
+    "table_list" = [
+      {
+        "table_path" = "public.table1"
+      },
+      {
+        "table_path" = "public.table2"
+      }
+    ]
+    split.size = 10000
+  }
+}
+
+sink {
+  Console {}
+}
+```
+
+For more detailed examples and configurations, please refer to the PostgreSQL 
connector documentation.
+
+## Changelog
+
+<ChangeLog />
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCloudberryIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCloudberryIT.java
new file mode 100644
index 0000000000..bb898d6761
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCloudberryIT.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.PullPolicy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+public class JdbcCloudberryIT extends AbstractJdbcIT {
+    private static final String CLOUDBERRY_IMAGE = "lhrbest/cbdb:1.5.4";
+    private static final String CLOUDBERRY_CONTAINER_HOST = "cbdb";
+    private static final String CLOUDBERRY_DATABASE = "postgres";
+
+    private static final String CLOUDBERRY_SCHEMA = "public";
+    private static final String CLOUDBERRY_SOURCE = "source";
+    private static final String CLOUDBERRY_SINK = "sink";
+
+    private static final String CLOUDBERRY_USERNAME = "gpadmin";
+    private static final String CLOUDBERRY_PASSWORD = "gpadmin";
+    private static final int CLOUDBERRY_CONTAINER_PORT = 5432;
+
+    private static final String CLOUDBERRY_URL = "jdbc:postgresql://" + HOST + 
":%s/%s";
+
+    private static final String DRIVER_CLASS = "org.postgresql.Driver";
+
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList("/jdbc_cloudberry_source_and_sink.conf");
+
+    private static final String CREATE_SQL =
+            "CREATE TABLE %s (\n" + "age INT NOT NULL,\n" + "name VARCHAR(255) 
NOT NULL\n" + ")";
+
+    @Override
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl =
+                String.format(CLOUDBERRY_URL, CLOUDBERRY_CONTAINER_PORT, 
CLOUDBERRY_DATABASE);
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
+
+        String insertSql = insertTable(CLOUDBERRY_SCHEMA, CLOUDBERRY_SOURCE, 
fieldNames);
+
+        return JdbcCase.builder()
+                .dockerImage(CLOUDBERRY_IMAGE)
+                .networkAliases(CLOUDBERRY_CONTAINER_HOST)
+                .containerEnv(containerEnv)
+                .driverClass(DRIVER_CLASS)
+                .host(HOST)
+                .port(CLOUDBERRY_CONTAINER_PORT)
+                .localPort(CLOUDBERRY_CONTAINER_PORT)
+                .jdbcTemplate(CLOUDBERRY_URL)
+                .jdbcUrl(jdbcUrl)
+                .userName(CLOUDBERRY_USERNAME)
+                .password(CLOUDBERRY_PASSWORD)
+                .database(CLOUDBERRY_SCHEMA)
+                .sourceTable(CLOUDBERRY_SOURCE)
+                .sinkTable(CLOUDBERRY_SINK)
+                .createSql(CREATE_SQL)
+                .configFile(CONFIG_FILE)
+                .insertSql(insertSql)
+                .testData(testDataSet)
+                .tablePathFullName(CLOUDBERRY_SOURCE)
+                .useSaveModeCreateTable(false)
+                .build();
+    }
+
+    @Override
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";;
+    }
+
+    @Override
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        String[] fieldNames =
+                new String[] {
+                    "age", "name",
+                };
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                i, "f_" + i,
+                            });
+            rows.add(row);
+        }
+
+        return Pair.of(fieldNames, rows);
+    }
+
+    @Override
+    GenericContainer<?> initContainer() {
+        DockerImageName imageName = DockerImageName.parse(CLOUDBERRY_IMAGE);
+        GenericContainer<?> container =
+                new GenericContainer<>(imageName)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(CLOUDBERRY_CONTAINER_HOST)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(CLOUDBERRY_IMAGE)))
+                        .withCommand("/usr/sbin/init") // Ensure container 
starts correctly
+                        .withPrivilegedMode(true); // Set privileged mode
+        // Mount cgroup volume
+        container.addFileSystemBind("/sys/fs/cgroup", "/sys/fs/cgroup", 
BindMode.READ_ONLY);
+        container.setPortBindings(
+                Lists.newArrayList(
+                        String.format(
+                                "%s:%s", CLOUDBERRY_CONTAINER_PORT, 
CLOUDBERRY_CONTAINER_PORT)));
+        return container;
+    }
+
+    @Override
+    public String quoteIdentifier(String field) {
+        return "\"" + field + "\"";
+    }
+
+    @Override
+    public void clearTable(String schema, String table) {
+        // do nothing.
+    }
+
+    @Override
+    protected void beforeStartUP() {
+        log.info("Setting up Apache Cloudberry...");
+        try {
+            // Wait for container to start
+            Thread.sleep(5000);
+            // Switch to gpadmin user and start database
+            Container.ExecResult execResult =
+                    dbServer.execInContainer("bash", "-c", "su - gpadmin -c 
'gpstart -a'");
+            log.info("gpstart result: {}", execResult.getStdout());
+            // Set gpadmin password
+            execResult =
+                    dbServer.execInContainer(
+                            "bash",
+                            "-c",
+                            "su - gpadmin -c \"psql -c \\\"ALTER USER gpadmin 
WITH PASSWORD 'gpadmin';\\\"\"");
+            log.info("Set password result: {}", execResult.getStdout());
+            // Confirm database is started
+            execResult =
+                    dbServer.execInContainer(
+                            "bash", "-c", "su - gpadmin -c 'psql -c \"SELECT 
version();\"'");
+            log.info("Apache Cloudberry version: {}", execResult.getStdout());
+
+        } catch (InterruptedException | IOException e) {
+            log.error("Failed to initialize Apache Cloudberry", e);
+            throw new RuntimeException("Failed to initialize Apache 
Cloudberry", e);
+        }
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+        dbServer = 
initContainer().withImagePullPolicy(PullPolicy.alwaysPull());
+        Startables.deepStart(Stream.of(dbServer)).join();
+        jdbcCase = getJdbcCase();
+        beforeStartUP();
+        // Increase retry count and timeout, CloudberryDB might need more time 
to start
+        given().ignoreExceptions()
+                .await()
+                .atMost(600, TimeUnit.SECONDS) // Increase waiting time
+                .pollInterval(10, TimeUnit.SECONDS) // Set polling interval
+                .untilAsserted(() -> 
this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
+        createSchemaIfNeeded();
+        createNeededTables();
+        insertTestData();
+        initCatalog();
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_cloudberry_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_cloudberry_source_and_sink.conf
new file mode 100644
index 0000000000..af574a7655
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_cloudberry_source_and_sink.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Jdbc {
+    driver = org.postgresql.Driver
+    url = "jdbc:postgresql://cbdb:5432/postgres"
+    user = gpadmin
+    password = gpadmin
+    query = "select age, name from source"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform-v2/sql
+}
+
+sink {
+  Jdbc {
+    driver = org.postgresql.Driver
+    url = "jdbc:postgresql://cbdb:5432/postgres"
+    user = gpadmin
+    password = gpadmin
+    query = "insert into sink(age, name) values(?, ?)"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
\ No newline at end of file

Reply via email to