This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 04db40d973 [Fix] [Connector-V2] Postgres support for multiple primary 
keys (#8526)
04db40d973 is described below

commit 04db40d97372ffd5a572e0a8a54dfd33189c8529
Author: shirukai <308899...@qq.com>
AuthorDate: Fri Jan 17 20:34:06 2025 +0800

    [Fix] [Connector-V2] Postgres support for multiple primary keys (#8526)
---
 .../psql/PostgresCreateTableSqlBuilder.java        | 26 +++++++++++++++-------
 .../psql/PostgresCreateTableSqlBuilderTest.java    |  5 +++--
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
index 1fbfd7c095..1fb57f3e9f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java
@@ -73,6 +73,11 @@ public class PostgresCreateTableSqlBuilder {
                                                 buildColumnSql(column), 
fieldIde))
                         .collect(Collectors.toList());
 
+        // add primary key
+        if (createIndex && primaryKey != null) {
+            columnSqls.add("\t" + buildPrimaryKeySql());
+        }
+
         if (createIndex && CollectionUtils.isNotEmpty(constraintKeys)) {
             for (ConstraintKey constraintKey : constraintKeys) {
                 if (StringUtils.isBlank(constraintKey.getConstraintName())
@@ -134,14 +139,6 @@ public class PostgresCreateTableSqlBuilder {
         if (!column.isNullable()) {
             columnSql.append(" NOT NULL");
         }
-
-        // Add primary key directly after the column if it is a primary key
-        if (createIndex
-                && primaryKey != null
-                && primaryKey.getColumnNames().contains(column.getName())) {
-            columnSql.append(" PRIMARY KEY");
-        }
-
         return columnSql.toString();
     }
 
@@ -163,6 +160,19 @@ public class PostgresCreateTableSqlBuilder {
         return columnCommentSql.toString();
     }
 
+    private String buildPrimaryKeySql() {
+        String constraintName = UUID.randomUUID().toString().replace("-", "");
+        String primaryKeyColumns =
+                primaryKey.getColumnNames().stream()
+                        .map(
+                                column ->
+                                        String.format(
+                                                "\"%s\"",
+                                                
CatalogUtils.getFieldIde(column, fieldIde)))
+                        .collect(Collectors.joining(","));
+        return "CONSTRAINT \"" + constraintName + "\" PRIMARY KEY (" + 
primaryKeyColumns + ")";
+    }
+
     private String buildUniqueKeySql(ConstraintKey constraintKey) {
         String constraintName = UUID.randomUUID().toString().replace("-", "");
         String indexColumns =
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java
index 03b99b1ca0..cc820a4ed3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java
@@ -52,9 +52,10 @@ class PostgresCreateTableSqlBuilderTest {
                                             
catalogTable.getTableId().toTablePath());
                             String pattern =
                                     "CREATE TABLE \"test\" \\(\n"
-                                            + "\"id\" int4 NOT NULL PRIMARY 
KEY,\n"
+                                            + "\"id\" int4 NOT NULL,\n"
                                             + "\"name\" text NOT NULL,\n"
                                             + "\"age\" int4 NOT NULL,\n"
+                                            + "\tCONSTRAINT \"([a-zA-Z0-9]+)\" 
PRIMARY KEY \\(\"id\",\"name\"\\),\n"
                                             + "\tCONSTRAINT \"([a-zA-Z0-9]+)\" 
UNIQUE \\(\"name\"\\)\n"
                                             + "\\);";
                             Assertions.assertTrue(
@@ -142,7 +143,7 @@ class PostgresCreateTableSqlBuilderTest {
         TableSchema tableSchema =
                 TableSchema.builder()
                         .columns(columns)
-                        .primaryKey(PrimaryKey.of("pk_id", 
Lists.newArrayList("id")))
+                        .primaryKey(PrimaryKey.of("pk_id_name", 
Lists.newArrayList("id", "name")))
                         .constraintKey(
                                 Lists.newArrayList(
                                         ConstraintKey.of(

Reply via email to