This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3561d3878 [Connector-V2][JDBC] Support database: greenplum (#2429)
3561d3878 is described below

commit 3561d3878f229555802abeaf4e7ed9ba8a49ba43
Author: hailin0 <[email protected]>
AuthorDate: Wed Aug 24 09:57:28 2022 +0800

    [Connector-V2][JDBC] Support database: greenplum (#2429)
    
    * [Connector-V2][JDBC] Support database: Greenplum
    
    Support connect greenplum drivers:
    * postgresql driver: org.postgresql.Driver
    * greenplum driver: com.pivotal.jdbc.GreenplumDriver
    
    Co-authored-by: wanghailin <[email protected]>
---
 docs/en/connector-v2/sink/Greenplum.md             |  27 ++++
 docs/en/connector-v2/source/Greenplum.md           |  17 +++
 .../dialect/greenplum/GreenplumDialectFactory.java |  40 ++++++
 .../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java      |   2 +-
 .../e2e/flink/v2/jdbc/JdbcGreenplumIT.java         | 159 +++++++++++++++++++++
 .../jdbc/jdbc_greenplum_source_and_sink.conf       |  60 ++++++++
 .../e2e/spark/v2/jdbc/JdbcGreenplumIT.java         | 159 +++++++++++++++++++++
 .../jdbc/jdbc_greenplum_source_and_sink.conf       |  62 ++++++++
 8 files changed, 525 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector-v2/sink/Greenplum.md 
b/docs/en/connector-v2/sink/Greenplum.md
new file mode 100644
index 000000000..9317e5c62
--- /dev/null
+++ b/docs/en/connector-v2/sink/Greenplum.md
@@ -0,0 +1,27 @@
+# Greenplum
+
+> Greenplum sink connector
+
+## Description
+
+Write data to Greenplum using [Jdbc connector](Jdbc.md).
+
+:::tip
+
+Not support exactly-once semantics (XA transaction is not yet supported in 
Greenplum database).
+
+:::
+
+## Options
+
+### driver [string]
+
+Optional jdbc drivers:
+- `org.postgresql.Driver`
+- `com.pivotal.jdbc.GreenplumDriver`
+
+Warn: for license compliance, if you use `GreenplumDriver` the have to provide 
Greenplum JDBC driver yourself, e.g. copy greenplum-xxx.jar to 
$SEATNUNNEL_HOME/lib for Standalone.
+
+### url [string]
+
+The URL of the JDBC connection. if you use postgresql driver the value is 
`jdbc:postgresql://${yous_host}:${yous_port}/${yous_database}`, or you use 
greenplum driver the value is 
`jdbc:pivotal:greenplum://${yous_host}:${yous_port};DatabaseName=${yous_database}`
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/Greenplum.md 
b/docs/en/connector-v2/source/Greenplum.md
new file mode 100644
index 000000000..cd140549b
--- /dev/null
+++ b/docs/en/connector-v2/source/Greenplum.md
@@ -0,0 +1,17 @@
+# Greenplum
+
+> Greenplum source connector
+
+## Description
+
+Read Greenplum data through [Jdbc connector](Jdbc.md).
+
+:::tip
+
+Optional jdbc drivers:
+- `org.postgresql.Driver`
+- `com.pivotal.jdbc.GreenplumDriver`
+
+Warn: for license compliance, if you use `GreenplumDriver` the have to provide 
Greenplum JDBC driver yourself, e.g. copy greenplum-xxx.jar to 
$SEATNUNNEL_HOME/lib for Standalone.
+
+:::
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java
new file mode 100644
index 000000000..fb4ca3865
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.greenplum;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
+
+import com.google.auto.service.AutoService;
+import lombok.NonNull;
+
+@AutoService(JdbcDialectFactory.class)
+public class GreenplumDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(@NonNull String url) {
+        // Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver
+        return url.startsWith("jdbc:pivotal:greenplum:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new PostgresDialect();
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
index 784bd9a06..9bc241bf4 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -89,7 +89,7 @@ public class FakeSourceToJdbcIT extends FlinkContainer {
     }
 
     @AfterEach
-    public void closeClickHouseContainer() {
+    public void closePostgreSqlContainer() {
         if (psl != null) {
             psl.stop();
         }
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
new file mode 100644
index 000000000..715441032
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcGreenplumIT extends FlinkContainer {
+
+    private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
+    private static final String GREENPLUM_CONTAINER_HOST = 
"flink_e2e_greenplum";
+    private static final int GREENPLUM_CONTAINER_PORT = 5432;
+    private static final String GREENPLUM_HOST = "localhost";
+    private static final int GREENPLUM_PORT = 5435;
+    private static final String GREENPLUM_USER = "tester";
+    private static final String GREENPLUM_PASSWORD = "pivotal";
+    private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
+    private static final String GREENPLUM_JDBC_URL = String.format(
+            "jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);
+    private static final List<List> TEST_DATASET = generateTestDataset();
+
+    private GenericContainer<?> greenplumServer;
+    private Connection jdbcConnection;
+
+    @BeforeEach
+    public void startGreenplumContainer() throws ClassNotFoundException, 
SQLException {
+        greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        greenplumServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", GREENPLUM_PORT, 
GREENPLUM_CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(greenplumServer)).join();
+        log.info("Greenplum container started");
+        // wait for Greenplum fully start
+        Class.forName(GREENPLUM_DRIVER);
+        given().ignoreExceptions()
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(() -> initializeJdbcConnection());
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    @Test
+    public void testJdbcGreenplumSourceAndSink() throws IOException, 
InterruptedException, SQLException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/jdbc/jdbc_greenplum_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        // query result
+        String sql = "select age, name from sink order by age asc";
+        List<List> result = new ArrayList<>();
+        try (Statement statement = jdbcConnection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(Arrays.asList(
+                        resultSet.getInt(1),
+                        resultSet.getString(2)));
+            }
+        }
+        Assertions.assertIterableEquals(TEST_DATASET, result);
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
+                GREENPLUM_USER, GREENPLUM_PASSWORD);
+    }
+
+    private void initializeJdbcTable() throws SQLException {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            String createSource = "CREATE TABLE source (\n" +
+                    "age INT NOT NULL,\n" +
+                    "name VARCHAR(255) NOT NULL\n" +
+                    ")";
+            String createSink = "CREATE TABLE sink (\n" +
+                    "age INT NOT NULL,\n" +
+                    "name VARCHAR(255) NOT NULL\n" +
+                    ")";
+            statement.execute(createSource);
+            statement.execute(createSink);
+        }
+    }
+
+    private static List<List> generateTestDataset() {
+        List<List> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = 
jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
new file mode 100644
index 000000000..e6ee34bf3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+    # You can set flink configuration here
+    execution.parallelism = 1
+    job.mode = "BATCH"
+    #execution.checkpoint.interval = 10000
+    #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+    # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    Jdbc {
+        driver = org.postgresql.Driver
+        url = 
"jdbc:postgresql://flink_e2e_greenplum:5432/testdb?loggerLevel=OFF"
+        user = tester
+        password = pivotal
+        query = "select age, name from source"
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of source plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+    Jdbc {
+        driver = org.postgresql.Driver
+        url = 
"jdbc:postgresql://flink_e2e_greenplum:5432/testdb?loggerLevel=OFF"
+        user = tester
+        password = pivotal
+        query = "insert into sink(age, name) values(?, ?)"
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of sink plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
new file mode 100644
index 000000000..7910a0dde
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcGreenplumIT extends SparkContainer {
+
+    private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
+    private static final String GREENPLUM_CONTAINER_HOST = 
"spark_e2e_greenplum";
+    private static final int GREENPLUM_CONTAINER_PORT = 5432;
+    private static final String GREENPLUM_HOST = "localhost";
+    private static final int GREENPLUM_PORT = 5436;
+    private static final String GREENPLUM_USER = "tester";
+    private static final String GREENPLUM_PASSWORD = "pivotal";
+    private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
+    private static final String GREENPLUM_JDBC_URL = String.format(
+            "jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);
+    private static final List<List> TEST_DATASET = generateTestDataset();
+
+    private GenericContainer<?> greenplumServer;
+    private Connection jdbcConnection;
+
+    @BeforeEach
+    public void startGreenplumContainer() throws ClassNotFoundException, 
SQLException {
+        greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        greenplumServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:%s", GREENPLUM_PORT, 
GREENPLUM_CONTAINER_PORT)));
+        Startables.deepStart(Stream.of(greenplumServer)).join();
+        log.info("Greenplum container started");
+        // wait for Greenplum fully start
+        Class.forName(GREENPLUM_DRIVER);
+        given().ignoreExceptions()
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(() -> initializeJdbcConnection());
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    @Test
+    public void testJdbcGreenplumSourceAndSink() throws IOException, 
InterruptedException, SQLException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/jdbc/jdbc_greenplum_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        // query result
+        String sql = "select age, name from sink order by age asc";
+        List<Object> result = new ArrayList<>();
+        try (Statement statement = jdbcConnection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(Arrays.asList(
+                        resultSet.getInt(1),
+                        resultSet.getString(2)));
+            }
+        }
+        Assertions.assertIterableEquals(TEST_DATASET, result);
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
+                GREENPLUM_USER, GREENPLUM_PASSWORD);
+    }
+
+    private void initializeJdbcTable() throws SQLException {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            String createSource = "CREATE TABLE source (\n" +
+                    "age INT NOT NULL,\n" +
+                    "name VARCHAR(255) NOT NULL\n" +
+                    ")";
+            String createSink = "CREATE TABLE sink (\n" +
+                    "age INT NOT NULL,\n" +
+                    "name VARCHAR(255) NOT NULL\n" +
+                    ")";
+            statement.execute(createSource);
+            statement.execute(createSink);
+        }
+    }
+
+    private static List<List> generateTestDataset() {
+        List<List> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = 
jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
new file mode 100644
index 000000000..6c06feb23
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/jdbc/jdbc_greenplum_source_and_sink.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+    # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    Jdbc {
+        driver = org.postgresql.Driver
+        url = 
"jdbc:postgresql://spark_e2e_greenplum:5432/testdb?loggerLevel=OFF"
+        user = tester
+        password = pivotal
+        query = "select age, name from source"
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of source plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+    Jdbc {
+        driver = org.postgresql.Driver
+        url = 
"jdbc:postgresql://spark_e2e_greenplum:5432/testdb?loggerLevel=OFF"
+        user = tester
+        password = pivotal
+        query = "insert into sink(age, name) values(?, ?)"
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of sink plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
\ No newline at end of file

Reply via email to