This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7b4d05171b [Fix][Connector-Jdbc]Fixed Vertica data source cannot
upsert data. (#9607)
7b4d05171b is described below
commit 7b4d05171bef2122f5619b2468560400d65b9f5f
Author: chestnufang <[email protected]>
AuthorDate: Wed Jul 23 10:04:48 2025 +0800
[Fix][Connector-Jdbc]Fixed Vertica data source cannot upsert data. (#9607)
---
.../jdbc/internal/JdbcOutputFormatBuilder.java | 3 +-
.../jdbc/internal/dialect/JdbcDialect.java | 20 ++++
.../internal/dialect/vertica/VerticaDialect.java | 29 ++++-
.../dialect/vertica/VerticaDialectTest.java | 124 +++++++++++++++++++++
4 files changed, 170 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 30b313a79a..ba161f1685 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -187,8 +187,7 @@ public class JdbcOutputFormatBuilder {
}
if (enableUpsert) {
Optional<String> upsertSQL =
- dialect.getUpsertStatement(
- database, table, tableSchema.getFieldNames(),
pkNames);
+ dialect.getUpsertStatementByTableSchema(database, table,
tableSchema, pkNames);
if (upsertSQL.isPresent()) {
return createSimpleExecutor(
upsertSQL.get(),
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 6ec44d92f8..6460bdc875 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
@@ -239,6 +240,25 @@ public interface JdbcDialect extends Serializable {
Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields);
+ /**
+ * Constructs the dialects upsert statement if supported; such as MySQL's
{@code DUPLICATE KEY
+ * UPDATE}, or PostgreSQL's {@code ON CONFLICT... DO UPDATE SET..}.
+ *
+ * <p>If supported, the returned string will be used as a {@link
java.sql.PreparedStatement}.
+ * Fields in the statement must be in the same order as the {@code columns
in tableSchema}
+ * parameter.
+ *
+ * <p>If the dialect does not support native upsert statements, the writer
will fallback to
+ * {@code SELECT ROW Exists} + {@code UPDATE}/{@code INSERT} which may
have poor performance.
+ *
+ * @return the dialects {@code UPSERT} statement or {@link
Optional#empty()}.
+ */
+ default Optional<String> getUpsertStatementByTableSchema(
+ String database, String tableName, TableSchema tableSchema,
String[] uniqueKeyFields) {
+ return getUpsertStatement(
+ database, tableName, tableSchema.getFieldNames(),
uniqueKeyFields);
+ }
+
/**
* Different dialects optimize their PreparedStatement
*
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
index ac970d1564..1076a42984 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
@@ -17,6 +17,7 @@
package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -53,16 +54,36 @@ public class VerticaDialect implements JdbcDialect {
@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> getUpsertStatementByTableSchema(
+ String database, String tableName, TableSchema tableSchema,
String[] uniqueKeyFields) {
+ String[] fieldNames = tableSchema.getFieldNames();
List<String> nonUniqueKeyFields =
Arrays.stream(fieldNames)
.filter(fieldName ->
!Arrays.asList(uniqueKeyFields).contains(fieldName))
.collect(Collectors.toList());
+ // Vertica JDBC currently requires explicitly specifying the data type
String valuesBinding =
- Arrays.stream(fieldNames)
- .map(fieldName -> ":" + fieldName + " " +
quoteIdentifier(fieldName))
+ tableSchema.getColumns().stream()
+ .map(
+ column -> {
+ String fieldName = column.getName();
+ String sourceType = column.getSourceType();
+ return "CAST("
+ + ":"
+ + fieldName
+ + " AS "
+ + sourceType
+ + ")"
+ + " AS "
+ + quoteIdentifier(fieldName);
+ })
.collect(Collectors.joining(", "));
- String usingClause = String.format("SELECT %s FROM DUAL",
valuesBinding);
+ String usingClause = String.format("SELECT %s ", valuesBinding);
String onConditions =
Arrays.stream(uniqueKeyFields)
.map(
@@ -77,7 +98,7 @@ public class VerticaDialect implements JdbcDialect {
.map(
fieldName ->
String.format(
- "TARGET.%s=SOURCE.%s",
+ "%s=SOURCE.%s",
quoteIdentifier(fieldName),
quoteIdentifier(fieldName)))
.collect(Collectors.joining(", "));
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectTest.java
new file mode 100644
index 0000000000..1928480f35
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+public class VerticaDialectTest {
+
+ @Test
+ void testUpsertStatementByTableSchema() {
+ final String dataBaseName = "test_database";
+ final String tableName = "test_table";
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "id",
+ BasicType.LONG_TYPE,
+ 22L,
+ 0,
+ false,
+ null,
+ "id",
+ "BIGINT",
+ new HashMap<>()))
+ .column(
+ PhysicalColumn.of(
+ "name",
+ BasicType.STRING_TYPE,
+ 128L,
+ 0,
+ false,
+ null,
+ "name",
+ "VARCHAR",
+ new HashMap<>()))
+ .column(
+ PhysicalColumn.of(
+ "age",
+ BasicType.INT_TYPE,
+ (Long) null,
+ 0,
+ true,
+ null,
+ "age",
+ "INT",
+ new HashMap<>()))
+ .column(
+ PhysicalColumn.of(
+ "createTime",
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ 3L,
+ 0,
+ true,
+ null,
+ "createTime",
+ "TIME",
+ new HashMap<>()))
+ .primaryKey(PrimaryKey.of("id",
Lists.newArrayList("id")))
+ .constraintKey(
+ Collections.singletonList(
+ ConstraintKey.of(
+
ConstraintKey.ConstraintType.INDEX_KEY,
+ "name",
+ Lists.newArrayList(
+
ConstraintKey.ConstraintKeyColumn.of(
+ "name",
null)))))
+ .build();
+
+ VerticaDialect dialect = new VerticaDialect();
+ final String[] doUpdateKeyFields = {"id"};
+ final String[] doNothingKeyFields = {"id", "name", "age"};
+
+ String doUpdateSql =
+ dialect.getUpsertStatementByTableSchema(
+ dataBaseName, tableName, tableSchema,
doUpdateKeyFields)
+ .orElseThrow(
+ () ->
+ new AssertionError(
+ "Expected doUpdateSql String
to be present"));
+ Assertions.assertEquals(
+ doUpdateSql,
+ " MERGE INTO test_database.\"test_table\" TARGET USING (SELECT
CAST(:id AS BIGINT) AS \"id\", CAST(:name AS VARCHAR) AS \"name\", CAST(:age AS
INT) AS \"age\", CAST(:createTime AS TIME) AS \"createTime\" ) SOURCE ON
(TARGET.\"id\"=SOURCE.\"id\") WHEN MATCHED THEN UPDATE SET
\"name\"=SOURCE.\"name\", \"age\"=SOURCE.\"age\",
\"createTime\"=SOURCE.\"createTime\" WHEN NOT MATCHED THEN INSERT (\"id\",
\"name\", \"age\", \"createTime\") VALUES (SOURCE.\"id\", SOURCE.\"name\",
[...]
+
+ String upsertCreateTimeSQL =
+ dialect.getUpsertStatementByTableSchema(
+ dataBaseName, tableName, tableSchema,
doNothingKeyFields)
+ .orElseThrow(
+ () ->
+ new AssertionError(
+ "Expected doNothingSql String
to be present"));
+ Assertions.assertEquals(
+ upsertCreateTimeSQL,
+ " MERGE INTO test_database.\"test_table\" TARGET USING (SELECT
CAST(:id AS BIGINT) AS \"id\", CAST(:name AS VARCHAR) AS \"name\", CAST(:age AS
INT) AS \"age\", CAST(:createTime AS TIME) AS \"createTime\" ) SOURCE ON
(TARGET.\"id\"=SOURCE.\"id\" AND TARGET.\"name\"=SOURCE.\"name\" AND
TARGET.\"age\"=SOURCE.\"age\") WHEN MATCHED THEN UPDATE SET
\"createTime\"=SOURCE.\"createTime\" WHEN NOT MATCHED THEN INSERT (\"id\",
\"name\", \"age\", \"createTime\") VALUES (SOURCE.\"id\ [...]
+ }
+}