This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 04db40d973 [Fix] [Connector-V2] Postgres support for multiple primary keys (#8526) 04db40d973 is described below commit 04db40d97372ffd5a572e0a8a54dfd33189c8529 Author: shirukai <308899...@qq.com> AuthorDate: Fri Jan 17 20:34:06 2025 +0800 [Fix] [Connector-V2] Postgres support for multiple primary keys (#8526) --- .../psql/PostgresCreateTableSqlBuilder.java | 26 +++++++++++++++------- .../psql/PostgresCreateTableSqlBuilderTest.java | 5 +++-- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java index 1fbfd7c095..1fb57f3e9f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java @@ -73,6 +73,11 @@ public class PostgresCreateTableSqlBuilder { buildColumnSql(column), fieldIde)) .collect(Collectors.toList()); + // add primary key + if (createIndex && primaryKey != null) { + columnSqls.add("\t" + buildPrimaryKeySql()); + } + if (createIndex && CollectionUtils.isNotEmpty(constraintKeys)) { for (ConstraintKey constraintKey : constraintKeys) { if (StringUtils.isBlank(constraintKey.getConstraintName()) @@ -134,14 +139,6 @@ public class PostgresCreateTableSqlBuilder { if (!column.isNullable()) { columnSql.append(" NOT NULL"); } - - // Add primary key directly after the column if it is a primary key - if (createIndex - && primaryKey != null - && primaryKey.getColumnNames().contains(column.getName())) { - columnSql.append(" PRIMARY KEY"); - } - return columnSql.toString(); } @@ -163,6 +160,19 @@ public class PostgresCreateTableSqlBuilder { return columnCommentSql.toString(); } + private String buildPrimaryKeySql() { + String constraintName = UUID.randomUUID().toString().replace("-", ""); + String primaryKeyColumns = + primaryKey.getColumnNames().stream() + .map( + column -> + String.format( + "\"%s\"", + CatalogUtils.getFieldIde(column, fieldIde))) + .collect(Collectors.joining(",")); + return "CONSTRAINT \"" + constraintName + "\" PRIMARY KEY (" + primaryKeyColumns + ")"; + } + private String buildUniqueKeySql(ConstraintKey constraintKey) { String constraintName = UUID.randomUUID().toString().replace("-", ""); String indexColumns = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java index 03b99b1ca0..cc820a4ed3 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilderTest.java @@ -52,9 +52,10 @@ class PostgresCreateTableSqlBuilderTest { catalogTable.getTableId().toTablePath()); String pattern = "CREATE TABLE \"test\" \\(\n" - + "\"id\" int4 NOT NULL PRIMARY KEY,\n" + + "\"id\" int4 NOT NULL,\n" + "\"name\" text NOT NULL,\n" + "\"age\" int4 NOT NULL,\n" + + "\tCONSTRAINT \"([a-zA-Z0-9]+)\" PRIMARY KEY \\(\"id\",\"name\"\\),\n" + "\tCONSTRAINT \"([a-zA-Z0-9]+)\" UNIQUE \\(\"name\"\\)\n" + "\\);"; Assertions.assertTrue( @@ -142,7 +143,7 @@ class PostgresCreateTableSqlBuilderTest { TableSchema tableSchema = TableSchema.builder() .columns(columns) - .primaryKey(PrimaryKey.of("pk_id", Lists.newArrayList("id"))) + .primaryKey(PrimaryKey.of("pk_id_name", Lists.newArrayList("id", "name"))) .constraintKey( Lists.newArrayList( ConstraintKey.of(