This is an automated email from the ASF dual-hosted git repository. tyrantlucifer pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 6673f6f771 [Feature][Connectors-V2][Jdbc] Supports Sqlserver Niche Data Types (#6122) 6673f6f771 is described below commit 6673f6f7711ba199765f489c4b798fd3a9a5cefa Author: 丑西蒙 <38197210+simonchou12...@users.noreply.github.com> AuthorDate: Wed Jan 3 19:55:00 2024 +0800 [Feature][Connectors-V2][Jdbc] Supports Sqlserver Niche Data Types (#6122) --- .../sqlserver/SqlServerDataTypeConvertor.java | 2 + .../dialect/sqlserver/SqlserverTypeMapper.java | 6 ++ .../connectors/seatunnel/jdbc/JdbcSqlServerIT.java | 108 ++++++++++++++++++++- .../resources/jdbc_sqlserver_source_to_sink.conf | 6 +- 4 files changed, 115 insertions(+), 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java index 5f88492643..d874c52345 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java @@ -81,6 +81,8 @@ public class SqlServerDataTypeConvertor implements DataTypeConvertor<SqlServerTy case NVARCHAR: case TEXT: case XML: + case GUID: + case SQL_VARIANT: return BasicType.STRING_TYPE; case DATE: return LocalTimeType.LOCAL_DATE_TYPE; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java index 2836441927..d727d71b7f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java @@ -58,6 +58,9 @@ public class SqlserverTypeMapper implements JdbcDialectTypeMapper { private static final String SQLSERVER_NCHAR = "NCHAR"; private static final String SQLSERVER_NVARCHAR = "NVARCHAR"; private static final String SQLSERVER_TEXT = "TEXT"; + private static final String SQLSERVER_XML = "XML"; + private static final String SQLSERVER_UNIQUEIDENTIFIER = "UNIQUEIDENTIFIER"; + private static final String SQLSERVER_SQLVARIANT = "SQL_VARIANT"; // ------------------------------time------------------------- private static final String SQLSERVER_DATE = "DATE"; @@ -105,6 +108,9 @@ public class SqlserverTypeMapper implements JdbcDialectTypeMapper { case SQLSERVER_NTEXT: case SQLSERVER_NVARCHAR: case SQLSERVER_TEXT: + case SQLSERVER_XML: + case SQLSERVER_UNIQUEIDENTIFIER: + case SQLSERVER_SQLVARIANT: return BasicType.STRING_TYPE; case SQLSERVER_DATE: return LocalTimeType.LOCAL_DATE_TYPE; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java index e871d81fd1..59e5ff0189 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java @@ -39,11 +39,17 @@ import org.testcontainers.utility.DockerLoggerFactory; import com.google.common.collect.Lists; import java.io.IOException; +import java.math.BigDecimal; import java.sql.SQLException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; public class JdbcSqlServerIT extends AbstractJdbcIT { @@ -66,9 +72,39 @@ public class JdbcSqlServerIT extends AbstractJdbcIT { Lists.newArrayList("/jdbc_sqlserver_source_to_sink.conf"); private static final String CREATE_SQL = "CREATE TABLE %s (\n" - + " [age] bigint NOT NULL,\n" - + " [name] varchar(255) COLLATE Chinese_PRC_CI_AS NULL\n" - + ")"; + + "\tBIGINT_TEST bigint NOT NULL,\n" + + "\tBINARY_TEST binary(255) NULL,\n" + + "\tBIT_TEST bit NULL,\n" + + "\tCHAR_TEST char(255) COLLATE Chinese_PRC_CS_AS NULL,\n" + + "\tDATE_TEST date NULL,\n" + + "\tDATETIME_TEST datetime NULL,\n" + + "\tDATETIME2_TEST datetime2 NULL,\n" + + "\tDATETIMEOFFSET_TEST datetimeoffset NULL,\n" + + "\tDECIMAL_TEST decimal(18,2) NULL,\n" + + "\tFLOAT_TEST float NULL,\n" + + "\tIMAGE_TEST image NULL,\n" + + "\tINT_TEST int NULL,\n" + + "\tMONEY_TEST money NULL,\n" + + "\tNCHAR_TEST nchar(1) COLLATE Chinese_PRC_CS_AS NULL,\n" + + "\tNTEXT_TEST ntext COLLATE Chinese_PRC_CS_AS NULL,\n" + + "\tNUMERIC_TEST numeric(18,2) NULL,\n" + + "\tNVARCHAR_TEST nvarchar(16) COLLATE Chinese_PRC_CS_AS NULL,\n" + + "\tNVARCHAR_MAX_TEST nvarchar(MAX) COLLATE Chinese_PRC_CS_AS NULL,\n" + + "\tREAL_TEST real NULL,\n" + + "\tSMALLDATETIME_TEST smalldatetime NULL,\n" + + "\tSMALLINT_TEST smallint NULL,\n" + + "\tSMALLMONEY_TEST smallmoney NULL,\n" + + "\tSQL_VARIANT_TEST sql_variant NULL,\n" + + "\tTEXT_TEST text COLLATE Chinese_PRC_CS_AS NULL,\n" + + "\tTIME_TEST time NULL,\n" + + "\tTINYINT_TEST tinyint NULL,\n" + + "\tUNIQUEIDENTIFIER_TEST uniqueidentifier NULL,\n" + + "\tVARBINARY_TEST varbinary(255) NULL,\n" + + "\tVARBINARY_MAX_TEST varbinary(MAX) NULL,\n" + + "\tVARCHAR_TEST varchar(16) COLLATE Chinese_PRC_CS_AS NULL,\n" + + "\tVARCHAR_MAX_TEST varchar(MAX) COLLATE Chinese_PRC_CS_AS DEFAULT NULL NULL,\n" + + "\tXML_TEST xml NULL\n" + + ");"; private String username; @@ -121,7 +157,38 @@ public class JdbcSqlServerIT extends AbstractJdbcIT { Pair<String[], List<SeaTunnelRow>> initTestData() { String[] fieldNames = new String[] { - "age", "name", + "BIGINT_TEST", + "BINARY_TEST", + "BIT_TEST", + "CHAR_TEST", + "DATE_TEST", + "DATETIME_TEST", + "DATETIME2_TEST", + "DATETIMEOFFSET_TEST", + "DECIMAL_TEST", + "FLOAT_TEST", + "IMAGE_TEST", + "INT_TEST", + "MONEY_TEST", + "NCHAR_TEST", + "NTEXT_TEST", + "NUMERIC_TEST", + "NVARCHAR_TEST", + "NVARCHAR_MAX_TEST", + "REAL_TEST", + "SMALLDATETIME_TEST", + "SMALLINT_TEST", + "SMALLMONEY_TEST", + "SQL_VARIANT_TEST", + "TEXT_TEST", + "TIME_TEST", + "TINYINT_TEST", + "UNIQUEIDENTIFIER_TEST", + "VARBINARY_TEST", + "VARBINARY_MAX_TEST", + "VARCHAR_TEST", + "VARCHAR_MAX_TEST", + "XML_TEST", }; List<SeaTunnelRow> rows = new ArrayList<>(); @@ -129,7 +196,38 @@ public class JdbcSqlServerIT extends AbstractJdbcIT { SeaTunnelRow row = new SeaTunnelRow( new Object[] { - i, "f_" + i, + (long) i, // BIGINT_TEST + new byte[255], // BINARY_TEST + i % 2 == 0, // BIT_TEST + "CharValue" + i, // CHAR_TEST + LocalDate.now(), // DATE_TEST + LocalDateTime.now(), // DATETIME_TEST + LocalDateTime.now(), // DATETIME2_TEST + OffsetDateTime.now(), // DATETIMEOFFSET_TEST + new BigDecimal("123.45"), // DECIMAL_TEST + 3.14f, // FLOAT_TEST + new byte[255], // IMAGE_TEST + 42, // INT_TEST + new BigDecimal("567.89"), // MONEY_TEST + "N", // NCHAR_TEST + "NTextValue" + i, // NTEXT_TEST + new BigDecimal("987.65"), // NUMERIC_TEST + "NVarCharValue" + i, // NVARCHAR_TEST + "NVarCharMaxValue" + i, // NVARCHAR_MAX_TEST + 2.71f, // REAL_TEST + LocalDateTime.now(), // SMALLDATETIME_TEST + (short) 123, // SMALLINT_TEST + new BigDecimal("456.78"), // SMALLMONEY_TEST + "SQL Variant Value" + i, // SQL_VARIANT_TEST + "TextValue" + i, // TEXT_TEST + LocalTime.now(), // TIME_TEST + (short) 5, // TINYINT_TEST + UUID.randomUUID(), // UNIQUEIDENTIFIER_TEST + new byte[255], // VARBINARY_TEST + new byte[8000], // VARBINARY_MAX_TEST + "VarCharValue" + i, // VARCHAR_TEST + "VarCharMaxValue" + i, // VARCHAR_MAX_TEST + "<xml>Test" + i + "</xml>", // XML_TEST }); rows.add(row); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_sqlserver_source_to_sink.conf index 0046cdbcec..47eb53ffce 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_sqlserver_source_to_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_sqlserver_source_to_sink.conf @@ -30,7 +30,7 @@ source { url = "jdbc:sqlserver://sqlserver;encrypt=false;" user = SA password = "A_Str0ng_Required_Password" - query = "select age, name from source" + query = "select * from dbo.source" } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, @@ -49,7 +49,9 @@ sink { url = "jdbc:sqlserver://sqlserver;encrypt=false;" user = SA password = "A_Str0ng_Required_Password" - query = "insert into sink(age, name) values(?,?)" + database = "master" + table = "dbo.sink" + generate_sink_sql = true } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,