This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new ef3e61dbc4 [Improve][[Jdbc]sink sql support custom field.(#6515) (#6525) ef3e61dbc4 is described below commit ef3e61dbc41c3c41f5a93ef48dc282980dc7888c Author: rtyuy <rt...@foxmail.com> AuthorDate: Sat Jun 15 22:49:48 2024 +0800 [Improve][[Jdbc]sink sql support custom field.(#6515) (#6525) --- .../executor/FieldNamedPreparedStatement.java | 27 ++++- .../seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java | 122 +++++++++++++++++++++ .../resources/jdbc_sink_name_parameter_sql.conf | 64 +++++++++++ 3 files changed, 208 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java index 29c98c7938..c98f50ba92 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java @@ -42,6 +42,7 @@ import java.sql.SQLXML; import java.sql.Time; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.Calendar; import java.util.HashMap; import java.util.List; @@ -636,13 +637,29 @@ public class FieldNamedPreparedStatement implements PreparedStatement { HashMap<String, List<Integer>> parameterMap = new HashMap<>(); parsedSQL = parseNamedStatement(sql, parameterMap); // currently, the statements must contain all the field parameters - checkArgument(parameterMap.size() == fieldNames.length); + parameterMap + .keySet() + .forEach( + namedParameter -> { + boolean namedParameterExist = + Arrays.asList(fieldNames).stream() + .anyMatch(field -> field.equals(namedParameter)); + checkArgument( + namedParameterExist, + String.format( + "Named parameters [%s] not in source columns, check SQL: %s", + namedParameter, sql)); + }); + for (int i = 0; i < fieldNames.length; i++) { String fieldName = fieldNames[i]; - checkArgument( - parameterMap.containsKey(fieldName), - fieldName + " doesn't exist in the parameters of SQL statement: " + sql); - indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray(); + boolean parameterExist = + parameterMap.keySet().stream() + .anyMatch(parameter -> parameter.equals(fieldName)); + indexMapping[i] = + parameterExist + ? parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray() + : new int[0]; } } log.info("PrepareStatement sql is:\n{}\n", parsedSQL); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java new file mode 100644 index 0000000000..ca0b1b081c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkNameParameterSQLIT.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.given; + +@Slf4j +public class JdbcSinkNameParameterSQLIT extends TestSuiteBase implements TestResource { + private static final String PG_IMAGE = "postgres:14-alpine"; + private static final String PG_DRIVER_JAR = + "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar"; + private PostgreSQLContainer<?> postgreSQLContainer; + + @TestContainerExtension + private final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + + PG_DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode()); + }; + + @BeforeAll + @Override + public void startUp() throws Exception { + postgreSQLContainer = + new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE)) + .withNetwork(TestSuiteBase.NETWORK) + .withNetworkAliases("postgresql") + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE))); + Startables.deepStart(Stream.of(postgreSQLContainer)).join(); + log.info("PostgreSQL container started"); + Class.forName(postgreSQLContainer.getDriverClassName()); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.MINUTES) + .untilAsserted(this::initializeJdbcTable); + } + + @TestTemplate + public void testSinkNamedParameterSQL(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/jdbc_sink_name_parameter_sql.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + private void initializeJdbcTable() { + try (Connection connection = + DriverManager.getConnection( + postgreSQLContainer.getJdbcUrl(), + postgreSQLContainer.getUsername(), + postgreSQLContainer.getPassword())) { + Statement statement = connection.createStatement(); + String sink = + "create table sink(\n" + + "user_id BIGINT NOT NULL PRIMARY KEY,\n" + + "name varchar(255),\n" + + "age INT\n" + + ")"; + statement.execute(sink); + } catch (SQLException e) { + throw new RuntimeException("Initializing PostgreSql table failed!", e); + } + } + + @AfterAll + @Override + public void tearDown() { + if (postgreSQLContainer != null) { + postgreSQLContainer.stop(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_name_parameter_sql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_name_parameter_sql.conf new file mode 100644 index 0000000000..50c04b07da --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_sink_name_parameter_sql.conf @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + row.num = 100 + schema = { + fields { + user_id = bigint + name = string + age = int + } + } + result_table_name = "fake" + } +} + +sink { + Assert { + source_table_name = "fake" + rules = { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 100 + }, + { + rule_type = MIN_ROW + rule_value = 100 + } + ] + } + } + Jdbc { + source_table_name = "fake" + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + user = test + password = test + generate_sink_sql = true + database = test + query = "insert into public.sink (user_id, name) values(:user_id, :name)" + + } +} \ No newline at end of file