This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new edcaacecb1 [Feature][Connector] update sqlserver catalog for save mode (#6086) edcaacecb1 is described below commit edcaacecb13f432c0ce0022aa6febc2f9de450de Author: 老王 <58297137+chl-...@users.noreply.github.com> AuthorDate: Wed Jan 3 11:28:59 2024 +0800 [Feature][Connector] update sqlserver catalog for save mode (#6086) --- .../jdbc/catalog/sqlserver/SqlServerCatalog.java | 10 +++++++ .../connectors/seatunnel/jdbc/JdbcSqlServerIT.java | 35 ++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java index 478ef873c7..bf58323ba0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java @@ -237,4 +237,14 @@ public class SqlServerCatalog extends AbstractJdbcCatalog { Connection defaultConnection = getConnection(defaultUrl); return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new SqlserverTypeMapper()); } + + @Override + public String getExistDataSql(TablePath tablePath) { + return String.format("select TOP 1 * from %s ;", tablePath.getFullNameWithQuoted("[", "]")); + } + + @Override + protected String getTruncateTableSql(TablePath tablePath) throws CatalogException { + return String.format("TRUNCATE TABLE %s", tablePath.getFullNameWithQuoted("[", "]")); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java index e56fa37573..e871d81fd1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java @@ -17,14 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerCatalog; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.MSSQLServerContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -190,4 +195,34 @@ public class JdbcSqlServerIT extends AbstractJdbcIT { SQLSERVER_SCHEMA); catalog.open(); } + + @TestTemplate + public void testCatalog(TestContainer container) throws IOException, InterruptedException { + TablePath tablePathSqlserver = TablePath.of("master", "dbo", "source"); + TablePath tablePathSqlserver_Sink = TablePath.of("master", "dbo", "sink_lw"); + SqlServerCatalog sqlServerCatalog = (SqlServerCatalog) catalog; + CatalogTable catalogTable = sqlServerCatalog.getTable(tablePathSqlserver); + // sink tableExists ? + boolean tableExistsBefore = sqlServerCatalog.tableExists(tablePathSqlserver_Sink); + Assertions.assertFalse(tableExistsBefore); + // create table + sqlServerCatalog.createTable(tablePathSqlserver_Sink, catalogTable, true); + boolean tableExistsAfter = sqlServerCatalog.tableExists(tablePathSqlserver_Sink); + Assertions.assertTrue(tableExistsAfter); + // isExistsData ? + boolean existsDataBefore = sqlServerCatalog.isExistsData(tablePathSqlserver_Sink); + Assertions.assertFalse(existsDataBefore); + // insert one data + sqlServerCatalog.executeSql( + tablePathSqlserver_Sink, "insert into sink_lw(age, name) values(12,'laowang')"); + boolean existsDataAfter = sqlServerCatalog.isExistsData(tablePathSqlserver_Sink); + Assertions.assertTrue(existsDataAfter); + // truncateTable + sqlServerCatalog.truncateTable(tablePathSqlserver_Sink, true); + Assertions.assertFalse(sqlServerCatalog.isExistsData(tablePathSqlserver_Sink)); + // drop table + sqlServerCatalog.dropTable(tablePathSqlserver_Sink, true); + Assertions.assertFalse(sqlServerCatalog.tableExists(tablePathSqlserver_Sink)); + sqlServerCatalog.close(); + } }