This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ef3e61dbc4 [Improve][[Jdbc]sink sql support custom field.(#6515) 
(#6525)
ef3e61dbc4 is described below

commit ef3e61dbc41c3c41f5a93ef48dc282980dc7888c
Author: rtyuy <rt...@foxmail.com>
AuthorDate: Sat Jun 15 22:49:48 2024 +0800

    [Improve][[Jdbc]sink sql support custom field.(#6515) (#6525)
---
 .../executor/FieldNamedPreparedStatement.java      |  27 ++++-
 .../seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java | 122 +++++++++++++++++++++
 .../resources/jdbc_sink_name_parameter_sql.conf    |  64 +++++++++++
 3 files changed, 208 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
index 29c98c7938..c98f50ba92 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
@@ -42,6 +42,7 @@ import java.sql.SQLXML;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
@@ -636,13 +637,29 @@ public class FieldNamedPreparedStatement implements 
PreparedStatement {
             HashMap<String, List<Integer>> parameterMap = new HashMap<>();
             parsedSQL = parseNamedStatement(sql, parameterMap);
             // currently, the statements must contain all the field parameters
-            checkArgument(parameterMap.size() == fieldNames.length);
+            parameterMap
+                    .keySet()
+                    .forEach(
+                            namedParameter -> {
+                                boolean namedParameterExist =
+                                        Arrays.asList(fieldNames).stream()
+                                                .anyMatch(field -> 
field.equals(namedParameter));
+                                checkArgument(
+                                        namedParameterExist,
+                                        String.format(
+                                                "Named parameters [%s] not in 
source columns, check SQL: %s",
+                                                namedParameter, sql));
+                            });
+
             for (int i = 0; i < fieldNames.length; i++) {
                 String fieldName = fieldNames[i];
-                checkArgument(
-                        parameterMap.containsKey(fieldName),
-                        fieldName + " doesn't exist in the parameters of SQL 
statement: " + sql);
-                indexMapping[i] = 
parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray();
+                boolean parameterExist =
+                        parameterMap.keySet().stream()
+                                .anyMatch(parameter -> 
parameter.equals(fieldName));
+                indexMapping[i] =
+                        parameterExist
+                                ? 
parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray()
+                                : new int[0];
             }
         }
         log.info("PrepareStatement sql is:\n{}\n", parsedSQL);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java
new file mode 100644
index 0000000000..ca0b1b081c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+public class JdbcSinkNameParameterSQLIT extends TestSuiteBase implements 
TestResource {
+    private static final String PG_IMAGE = "postgres:14-alpine";
+    private static final String PG_DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";;
+    private PostgreSQLContainer<?> postgreSQLContainer;
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && 
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+                                        + PG_DRIVER_JAR);
+                Assertions.assertEquals(0, extraCommands.getExitCode());
+            };
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        postgreSQLContainer =
+                new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+                        .withNetwork(TestSuiteBase.NETWORK)
+                        .withNetworkAliases("postgresql")
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+        Startables.deepStart(Stream.of(postgreSQLContainer)).join();
+        log.info("PostgreSQL container started");
+        Class.forName(postgreSQLContainer.getDriverClassName());
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(this::initializeJdbcTable);
+    }
+
+    @TestTemplate
+    public void testSinkNamedParameterSQL(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/jdbc_sink_name_parameter_sql.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection =
+                DriverManager.getConnection(
+                        postgreSQLContainer.getJdbcUrl(),
+                        postgreSQLContainer.getUsername(),
+                        postgreSQLContainer.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sink =
+                    "create table sink(\n"
+                            + "user_id BIGINT NOT NULL PRIMARY KEY,\n"
+                            + "name varchar(255),\n"
+                            + "age INT\n"
+                            + ")";
+            statement.execute(sink);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing PostgreSql table 
failed!", e);
+        }
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        if (postgreSQLContainer != null) {
+            postgreSQLContainer.stop();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_name_parameter_sql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_name_parameter_sql.conf
new file mode 100644
index 0000000000..50c04b07da
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_name_parameter_sql.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    row.num = 100
+    schema = {
+      fields {
+        user_id = bigint
+        name = string
+        age = int
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "fake"
+    rules = {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 100
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 100
+        }
+      ]
+    }
+  }
+  Jdbc {
+    source_table_name = "fake"
+    driver = org.postgresql.Driver
+    url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+    user = test
+    password = test
+    generate_sink_sql = true
+    database = test
+    query = "insert into public.sink (user_id, name) values(:user_id, :name)"
+
+  }
+}
\ No newline at end of file

Reply via email to