This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new e09445c789 [Fix][Connector-V2] Fix SqlServer create table when database with dot (#9007) e09445c789 is described below commit e09445c789e4790ae8018e56c113330d6e05e6b3 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Mon Apr 7 15:36:15 2025 +0800 [Fix][Connector-V2] Fix SqlServer create table when database with dot (#9007) --- .../sqlserver/SqlServerCreateTableSqlBuilder.java | 4 +- .../jdbc/internal/JdbcOutputFormatBuilder.java | 7 +-- .../seatunnel/jdbc/catalog/PreviewActionTest.java | 4 +- .../SqlServerCreateTableSqlBuilderTest.java | 28 +++++------ .../jdbc/internal/JdbcOutputFormatBuilderTest.java | 58 ++++++++++++++++++++++ 5 files changed, 77 insertions(+), 24 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java index 53ae23bb83..eda721d167 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java @@ -152,7 +152,7 @@ public class SqlServerCreateTableSqlBuilder { tableAndColumnComment.append( String.format( "EXEC %s.sys.sp_addextendedproperty 'MS_Description', N'%s', 'schema', N'%s', 'table', N'%s';\n", - tablePath.getDatabaseName(), + "[" + tablePath.getDatabaseName() + "]", comment, tablePath.getSchemaName(), tablePath.getTableName())); @@ -164,7 +164,7 @@ public class SqlServerCreateTableSqlBuilder { tableAndColumnComment.append( String.format( columnComment, - tablePath.getDatabaseName(), + "[" + tablePath.getDatabaseName() + "]", com, tablePath.getSchemaName(), tablePath.getTableName(), diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java index 7748823ca4..30b313a79a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal; import org.apache.seatunnel.api.table.catalog.Column; -import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; @@ -61,11 +60,7 @@ public class JdbcOutputFormatBuilder { JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory; final String database = jdbcSinkConfig.getDatabase(); - final String table = - dialect.extractTableName( - TablePath.of( - jdbcSinkConfig.getDatabase() + "." + jdbcSinkConfig.getTable())); - + final String table = jdbcSinkConfig.getTable(); final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys(); if (jdbcSinkConfig.isUseCopyStatement()) { statementExecutorFactory = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java index 06d85551a1..b430853a43 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java @@ -373,8 +373,8 @@ public class PreviewActionTest { + "CREATE TABLE [testddatabase].[testtable] ( \n" + "\t[test] NVARCHAR(MAX) NULL\n" + ");\n" - + "EXEC testddatabase.sys.sp_addextendedproperty 'MS_Description', N'comment', 'schema', N'null', 'table', N'testtable';\n" - + "EXEC testddatabase.sys.sp_addextendedproperty 'MS_Description', N'', 'schema', N'null', 'table', N'testtable', 'column', N'test';\n" + + "EXEC [testddatabase].sys.sp_addextendedproperty 'MS_Description', N'comment', 'schema', N'null', 'table', N'testtable';\n" + + "EXEC [testddatabase].sys.sp_addextendedproperty 'MS_Description', N'', 'schema', N'null', 'table', N'testtable', 'column', N'test';\n" + "\n" + "END", Optional.of(CATALOG_TABLE)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilderTest.java index b1483af9b4..3f049187de 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilderTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilderTest.java @@ -120,13 +120,13 @@ public class SqlServerCreateTableSqlBuilderTest { + "\t[lastUpdateTime] DATETIME2 NULL, \n" + "\tPRIMARY KEY ([id])\n" + ");\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table', 'column', N'blob_v';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table', 'column', N'createTime';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column', N'name';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column', N'id';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column', N'age';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table', 'column', N'lastUpdateTime';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table', 'column', N'blob_v';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table', 'column', N'createTime';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column', N'name';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column', N'id';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column', N'age';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table', 'column', N'lastUpdateTime';\n" + "\n" + "END"; @@ -149,13 +149,13 @@ public class SqlServerCreateTableSqlBuilderTest { + "\t[createTime] DATETIME2 NULL, \n" + "\t[lastUpdateTime] DATETIME2 NULL\n" + ");\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table', 'column', N'blob_v';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table', 'column', N'createTime';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column', N'name';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column', N'id';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column', N'age';\n" - + "EXEC test_database.sys.sp_addextendedproperty 'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table', 'column', N'lastUpdateTime';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table', 'column', N'blob_v';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table', 'column', N'createTime';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column', N'name';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column', N'id';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column', N'age';\n" + + "EXEC [test_database].sys.sp_addextendedproperty 'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table', 'column', N'lastUpdateTime';\n" + "\n" + "END"; CONSOLE.println(expectSkipIndex); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java index c7f94a0ac6..6b855fc3fb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java @@ -17,16 +17,30 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.TestConnection; +import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlserverJdbcRowConverter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import java.io.IOException; +import java.sql.SQLException; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.function.Function; public class JdbcOutputFormatBuilderTest { @@ -71,4 +85,48 @@ public class JdbcOutputFormatBuilderTest { updateAfter.setField(0, "2"); Assertions.assertNotEquals(keyExtractor.apply(insertRow), keyExtractor.apply(updateAfter)); } + + @Test + public void testBuildFormatWithDatabaseWithDot() + throws SQLException, ClassNotFoundException, IOException { + + TableSchema schema = + TableSchema.builder() + .column(PhysicalColumn.of("id", BasicType.INT_TYPE, 22L, false, null, "id")) + .build(); + + Map<String, Object> config = new HashMap<>(); + config.put("database", "databasewith.dot"); + config.put("table", "dbo.tableName"); + + SqlServerDialect dialect = Mockito.mock(SqlServerDialect.class); + Mockito.when(dialect.getRowConverter()).thenReturn(new SqlserverJdbcRowConverter()); + Mockito.when( + dialect.getInsertIntoStatement( + Mockito.anyString(), Mockito.anyString(), Mockito.any())) + .thenReturn(""); + + SimpleJdbcConnectionProvider provider = Mockito.mock(SimpleJdbcConnectionProvider.class); + Mockito.when(provider.getOrEstablishConnection()).thenReturn(new TestConnection()); + Mockito.when(provider.getConnection()).thenReturn(new TestConnection()); + + JdbcOutputFormat outputFormat = + new JdbcOutputFormatBuilder( + dialect, + provider, + JdbcSinkConfig.of(ReadonlyConfig.fromMap(config)), + schema, + schema) + .build(); + outputFormat.open(); + + ArgumentCaptor<String> database = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> table = ArgumentCaptor.forClass(String.class); + + Mockito.verify(dialect) + .getInsertIntoStatement(database.capture(), table.capture(), Mockito.any()); + + Assertions.assertEquals("databasewith.dot", database.getValue()); + Assertions.assertEquals("dbo.tableName", table.getValue()); + } }