This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e09445c789 [Fix][Connector-V2] Fix SqlServer create table when
database with dot (#9007)
e09445c789 is described below
commit e09445c789e4790ae8018e56c113330d6e05e6b3
Author: Jia Fan <[email protected]>
AuthorDate: Mon Apr 7 15:36:15 2025 +0800
[Fix][Connector-V2] Fix SqlServer create table when database with dot
(#9007)
---
.../sqlserver/SqlServerCreateTableSqlBuilder.java | 4 +-
.../jdbc/internal/JdbcOutputFormatBuilder.java | 7 +--
.../seatunnel/jdbc/catalog/PreviewActionTest.java | 4 +-
.../SqlServerCreateTableSqlBuilderTest.java | 28 +++++------
.../jdbc/internal/JdbcOutputFormatBuilderTest.java | 58 ++++++++++++++++++++++
5 files changed, 77 insertions(+), 24 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
index 53ae23bb83..eda721d167 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
@@ -152,7 +152,7 @@ public class SqlServerCreateTableSqlBuilder {
tableAndColumnComment.append(
String.format(
"EXEC %s.sys.sp_addextendedproperty
'MS_Description', N'%s', 'schema', N'%s', 'table', N'%s';\n",
- tablePath.getDatabaseName(),
+ "[" + tablePath.getDatabaseName() + "]",
comment,
tablePath.getSchemaName(),
tablePath.getTableName()));
@@ -164,7 +164,7 @@ public class SqlServerCreateTableSqlBuilder {
tableAndColumnComment.append(
String.format(
columnComment,
- tablePath.getDatabaseName(),
+ "[" + tablePath.getDatabaseName() + "]",
com,
tablePath.getSchemaName(),
tablePath.getTableName(),
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 7748823ca4..30b313a79a 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -61,11 +60,7 @@ public class JdbcOutputFormatBuilder {
JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory;
final String database = jdbcSinkConfig.getDatabase();
- final String table =
- dialect.extractTableName(
- TablePath.of(
- jdbcSinkConfig.getDatabase() + "." +
jdbcSinkConfig.getTable()));
-
+ final String table = jdbcSinkConfig.getTable();
final List<String> primaryKeys = jdbcSinkConfig.getPrimaryKeys();
if (jdbcSinkConfig.isUseCopyStatement()) {
statementExecutorFactory =
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
index 06d85551a1..b430853a43 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
@@ -373,8 +373,8 @@ public class PreviewActionTest {
+ "CREATE TABLE [testddatabase].[testtable] ( \n"
+ "\t[test] NVARCHAR(MAX) NULL\n"
+ ");\n"
- + "EXEC testddatabase.sys.sp_addextendedproperty
'MS_Description', N'comment', 'schema', N'null', 'table', N'testtable';\n"
- + "EXEC testddatabase.sys.sp_addextendedproperty
'MS_Description', N'', 'schema', N'null', 'table', N'testtable', 'column',
N'test';\n"
+ + "EXEC [testddatabase].sys.sp_addextendedproperty
'MS_Description', N'comment', 'schema', N'null', 'table', N'testtable';\n"
+ + "EXEC [testddatabase].sys.sp_addextendedproperty
'MS_Description', N'', 'schema', N'null', 'table', N'testtable', 'column',
N'test';\n"
+ "\n"
+ "END",
Optional.of(CATALOG_TABLE));
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilderTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilderTest.java
index b1483af9b4..3f049187de 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilderTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilderTest.java
@@ -120,13 +120,13 @@ public class SqlServerCreateTableSqlBuilderTest {
+ "\t[lastUpdateTime] DATETIME2 NULL, \n"
+ "\tPRIMARY KEY ([id])\n"
+ ");\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table',
'column', N'blob_v';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table',
'column', N'createTime';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column',
N'name';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column',
N'id';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column',
N'age';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table',
'column', N'lastUpdateTime';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table',
'column', N'blob_v';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table',
'column', N'createTime';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column',
N'name';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column',
N'id';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column',
N'age';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table',
'column', N'lastUpdateTime';\n"
+ "\n"
+ "END";
@@ -149,13 +149,13 @@ public class SqlServerCreateTableSqlBuilderTest {
+ "\t[createTime] DATETIME2 NULL, \n"
+ "\t[lastUpdateTime] DATETIME2 NULL\n"
+ ");\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table',
'column', N'blob_v';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table',
'column', N'createTime';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column',
N'name';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column',
N'id';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column',
N'age';\n"
- + "EXEC test_database.sys.sp_addextendedproperty
'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table',
'column', N'lastUpdateTime';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'User table', 'schema', N'null', 'table', N'test_table';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'blob_v', 'schema', N'null', 'table', N'test_table',
'column', N'blob_v';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'createTime', 'schema', N'null', 'table', N'test_table',
'column', N'createTime';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'name', 'schema', N'null', 'table', N'test_table', 'column',
N'name';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'id', 'schema', N'null', 'table', N'test_table', 'column',
N'id';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'age', 'schema', N'null', 'table', N'test_table', 'column',
N'age';\n"
+ + "EXEC [test_database].sys.sp_addextendedproperty
'MS_Description', N'lastUpdateTime', 'schema', N'null', 'table', N'test_table',
'column', N'lastUpdateTime';\n"
+ "\n"
+ "END";
CONSOLE.println(expectSkipIndex);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
index c7f94a0ac6..6b855fc3fb 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilderTest.java
@@ -17,16 +17,30 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.TestConnection;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlserverJdbcRowConverter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import java.io.IOException;
+import java.sql.SQLException;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.function.Function;
public class JdbcOutputFormatBuilderTest {
@@ -71,4 +85,48 @@ public class JdbcOutputFormatBuilderTest {
updateAfter.setField(0, "2");
Assertions.assertNotEquals(keyExtractor.apply(insertRow),
keyExtractor.apply(updateAfter));
}
+
+ @Test
+ public void testBuildFormatWithDatabaseWithDot()
+ throws SQLException, ClassNotFoundException, IOException {
+
+ TableSchema schema =
+ TableSchema.builder()
+ .column(PhysicalColumn.of("id", BasicType.INT_TYPE,
22L, false, null, "id"))
+ .build();
+
+ Map<String, Object> config = new HashMap<>();
+ config.put("database", "databasewith.dot");
+ config.put("table", "dbo.tableName");
+
+ SqlServerDialect dialect = Mockito.mock(SqlServerDialect.class);
+ Mockito.when(dialect.getRowConverter()).thenReturn(new
SqlserverJdbcRowConverter());
+ Mockito.when(
+ dialect.getInsertIntoStatement(
+ Mockito.anyString(), Mockito.anyString(),
Mockito.any()))
+ .thenReturn("");
+
+ SimpleJdbcConnectionProvider provider =
Mockito.mock(SimpleJdbcConnectionProvider.class);
+ Mockito.when(provider.getOrEstablishConnection()).thenReturn(new
TestConnection());
+ Mockito.when(provider.getConnection()).thenReturn(new
TestConnection());
+
+ JdbcOutputFormat outputFormat =
+ new JdbcOutputFormatBuilder(
+ dialect,
+ provider,
+
JdbcSinkConfig.of(ReadonlyConfig.fromMap(config)),
+ schema,
+ schema)
+ .build();
+ outputFormat.open();
+
+ ArgumentCaptor<String> database =
ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<String> table = ArgumentCaptor.forClass(String.class);
+
+ Mockito.verify(dialect)
+ .getInsertIntoStatement(database.capture(), table.capture(),
Mockito.any());
+
+ Assertions.assertEquals("databasewith.dot", database.getValue());
+ Assertions.assertEquals("dbo.tableName", table.getValue());
+ }
}