This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new edcaacecb1 [Feature][Connector] update sqlserver catalog for save mode 
(#6086)
edcaacecb1 is described below

commit edcaacecb13f432c0ce0022aa6febc2f9de450de
Author: 老王 <58297137+chl-...@users.noreply.github.com>
AuthorDate: Wed Jan 3 11:28:59 2024 +0800

    [Feature][Connector] update sqlserver catalog for save mode (#6086)
---
 .../jdbc/catalog/sqlserver/SqlServerCatalog.java   | 10 +++++++
 .../connectors/seatunnel/jdbc/JdbcSqlServerIT.java | 35 ++++++++++++++++++++++
 2 files changed, 45 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
index 478ef873c7..bf58323ba0 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java
@@ -237,4 +237,14 @@ public class SqlServerCatalog extends AbstractJdbcCatalog {
         Connection defaultConnection = getConnection(defaultUrl);
         return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new 
SqlserverTypeMapper());
     }
+
+    @Override
+    public String getExistDataSql(TablePath tablePath) {
+        return String.format("select TOP 1 * from %s ;", 
tablePath.getFullNameWithQuoted("[", "]"));
+    }
+
+    @Override
+    protected String getTruncateTableSql(TablePath tablePath) throws 
CatalogException {
+        return String.format("TRUNCATE TABLE  %s", 
tablePath.getFullNameWithQuoted("[", "]"));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index e56fa37573..e871d81fd1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -17,14 +17,19 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerCatalog;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
 
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.MSSQLServerContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -190,4 +195,34 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
                         SQLSERVER_SCHEMA);
         catalog.open();
     }
+
+    @TestTemplate
+    public void testCatalog(TestContainer container) throws IOException, 
InterruptedException {
+        TablePath tablePathSqlserver = TablePath.of("master", "dbo", "source");
+        TablePath tablePathSqlserver_Sink = TablePath.of("master", "dbo", 
"sink_lw");
+        SqlServerCatalog sqlServerCatalog = (SqlServerCatalog) catalog;
+        CatalogTable catalogTable = 
sqlServerCatalog.getTable(tablePathSqlserver);
+        // sink tableExists ?
+        boolean tableExistsBefore = 
sqlServerCatalog.tableExists(tablePathSqlserver_Sink);
+        Assertions.assertFalse(tableExistsBefore);
+        // create table
+        sqlServerCatalog.createTable(tablePathSqlserver_Sink, catalogTable, 
true);
+        boolean tableExistsAfter = 
sqlServerCatalog.tableExists(tablePathSqlserver_Sink);
+        Assertions.assertTrue(tableExistsAfter);
+        // isExistsData ?
+        boolean existsDataBefore = 
sqlServerCatalog.isExistsData(tablePathSqlserver_Sink);
+        Assertions.assertFalse(existsDataBefore);
+        // insert one data
+        sqlServerCatalog.executeSql(
+                tablePathSqlserver_Sink, "insert into sink_lw(age, name) 
values(12,'laowang')");
+        boolean existsDataAfter = 
sqlServerCatalog.isExistsData(tablePathSqlserver_Sink);
+        Assertions.assertTrue(existsDataAfter);
+        // truncateTable
+        sqlServerCatalog.truncateTable(tablePathSqlserver_Sink, true);
+        
Assertions.assertFalse(sqlServerCatalog.isExistsData(tablePathSqlserver_Sink));
+        // drop table
+        sqlServerCatalog.dropTable(tablePathSqlserver_Sink, true);
+        
Assertions.assertFalse(sqlServerCatalog.tableExists(tablePathSqlserver_Sink));
+        sqlServerCatalog.close();
+    }
 }

Reply via email to