This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 7e0008e6f [Improve] [Connector-V2] [StarRocks] Starrocks Support Auto 
Create Table (#4177)
7e0008e6f is described below

commit 7e0008e6fbcbb238bc86f97988618732b41127f1
Author: Hisoka <[email protected]>
AuthorDate: Tue Feb 21 17:22:20 2023 +0800

    [Improve] [Connector-V2] [StarRocks] Starrocks Support Auto Create Table 
(#4177)
---
 docs/en/connector-v2/sink/StarRocks.md             |  30 +-
 .../seatunnel/api/sink/SaveModeConstants.java      |  29 ++
 .../seatunnel/api/table/catalog/CatalogTable.java  |  11 +
 .../seatunnel/api/table/catalog/TablePath.java     |   4 +
 .../seatunnel/jdbc/catalog/MySqlCatalog.java       |   2 +-
 .../kafka/catalog/KafkaDataTypeConvertor.java      |   3 +
 .../connector-starrocks/pom.xml                    |   7 +
 .../starrocks/catalog/StarRocksCatalog.java        | 415 +++++++++++++++++++++
 .../catalog/StarRocksDataTypeConvertor.java        | 194 ++++++++++
 .../starrocks/client/StarRocksSinkManager.java     |   5 +-
 .../seatunnel/starrocks/config/SinkConfig.java     |  31 +-
 .../starrocks/sink/StarRocksSaveModeUtil.java      |  88 +++++
 .../seatunnel/starrocks/sink/StarRocksSink.java    |  55 ++-
 .../starrocks/sink/StarRocksSinkFactory.java       |   9 +-
 .../starrocks/sink/StarRocksSinkWriter.java        |   5 +-
 .../seatunnel/starrocks/StarRocksCatalogTest.java  |  82 ++++
 16 files changed, 948 insertions(+), 22 deletions(-)

diff --git a/docs/en/connector-v2/sink/StarRocks.md 
b/docs/en/connector-v2/sink/StarRocks.md
index 5bdddffbc..207823bda 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -18,7 +18,7 @@ The internal implementation of StarRocks sink connector is 
cached and imported b
 | username                    | string  | yes      | -               |
 | password                    | string  | yes      | -               |
 | database                    | string  | yes      | -               |
-| table                       | string  | yes      | -               |
+| table                       | string  | no       | -               |
 | labelPrefix                 | string  | no       | -               |
 | batch_max_rows              | long    | no       | 1024            |
 | batch_max_bytes             | int     | no       | 5 * 1024 * 1024 |
@@ -27,6 +27,7 @@ The internal implementation of StarRocks sink connector is 
cached and imported b
 | retry_backoff_multiplier_ms | int     | no       | -               |
 | max_retry_backoff_ms        | int     | no       | -               |
 | enable_upsert_delete        | boolean | no       | false           |
+| save_mode_create_template   | string  | no       | see below       |
 | starrocks.config            | map     | no       | -               |
 
 ### node_urls [list]
@@ -47,7 +48,7 @@ The name of StarRocks database
 
 ### table [string]
 
-The name of StarRocks table
+The name of StarRocks table, If not set, the table name will be the name of 
the upstream table
 
 ### labelPrefix [string]
 
@@ -81,6 +82,31 @@ The amount of time to wait before attempting to retry a 
request to `StarRocks`
 
 Whether to enable upsert/delete, only supports PrimaryKey model.
 
+### save_mode_create_template [string]
+
+We use templates to automatically create starrocks tables,
+which will create corresponding table creation statements based on the type of 
upstream data and schema type,
+and the default template can be modified according to the situation
+
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
+(
+    ${rowtype_fields}
+) ENGINE = OLAP DISTRIBUTED BY HASH (${rowtype_primary_key})
+    PROPERTIES
+(
+    "replication_num" = "1"
+);
+```
+
+You can use the following placeholders
+
+- database: Used to get the database in the upstream schema
+- table_name: Used to get the table name in the upstream schema
+- rowtype_fields: Used to get all the fields in the upstream schema, we will 
automatically map to the field
+  description of StarRocks
+- rowtype_primary_key: Used to get the primary key in the upstream schema 
(maybe a list)
+
 ### starrocks.config  [map]
 
 The parameter of the stream load `data_desc`
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
new file mode 100644
index 000000000..e3e56089a
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+public class SaveModeConstants {
+
+    public static final String ROWTYPE_PRIMARY_KEY = "rowtype_primary_key";
+    public static final String ROWTYPE_FIELDS = "rowtype_fields";
+
+    public static final String TABLE_NAME = "table_name";
+
+    public static final String DATABASE = "database";
+
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
index 1c8542300..9a8c6817f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
@@ -90,4 +90,15 @@ public final class CatalogTable implements Serializable {
     public String getComment() {
         return comment;
     }
+
+    @Override
+    public String toString() {
+        return "CatalogTable{" +
+            "tableId=" + tableId +
+            ", tableSchema=" + tableSchema +
+            ", options=" + options +
+            ", partitionKeys=" + partitionKeys +
+            ", comment='" + comment + '\'' +
+            '}';
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
index 61f61d936..0de425e63 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
@@ -58,6 +58,10 @@ public final class TablePath implements Serializable {
         return String.format("%s.%s", databaseName, tableName);
     }
 
+    public String getFullNameWithQuoted() {
+        return String.format("`%s`.`%s`", databaseName, tableName);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
index c4b8abcab..429de5683 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -127,7 +127,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
             Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData, 
tablePath.getDatabaseName(), tablePath.getTableName());
             List<ConstraintKey> constraintKeys = getConstraintKeys(metaData, 
tablePath.getDatabaseName(), tablePath.getTableName());
 
-            try (PreparedStatement ps = 
conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1 = 0;", 
tablePath.getFullName()))) {
+            try (PreparedStatement ps = 
conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1 = 0;", 
tablePath.getFullNameWithQuoted()))) {
                 ResultSetMetaData tableMetaData = ps.getMetaData();
                 TableSchema.Builder builder = TableSchema.builder();
                 // add column
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
index 99e0ab714..3a879e494 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
@@ -24,6 +24,8 @@ import 
org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
+import com.google.auto.service.AutoService;
+
 import java.util.Map;
 
 /**
@@ -39,6 +41,7 @@ import java.util.Map;
  * </pre>
  * <p> Right now the data type of kafka is SeaTunnelType, so we don't need to 
convert the data type.
  */
+@AutoService(DataTypeConvertor.class)
 public class KafkaDataTypeConvertor implements 
DataTypeConvertor<SeaTunnelDataType<?>> {
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml 
b/seatunnel-connectors-v2/connector-starrocks/pom.xml
index f7992271b..232e358e0 100644
--- a/seatunnel-connectors-v2/connector-starrocks/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -31,6 +31,7 @@
     <properties>
         <httpclient.version>4.5.13</httpclient.version>
         <httpcore.version>4.4.4</httpcore.version>
+        <mysql.version>8.0.16</mysql.version>
     </properties>
 
     <dependencies>
@@ -44,6 +45,12 @@
             <artifactId>connector-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
new file mode 100644
index 000000000..46fdb12b7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
+
+import com.mysql.cj.MysqlType;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class StarRocksCatalog implements Catalog {
+
+    protected final String catalogName;
+    protected final String defaultDatabase;
+    protected final String username;
+    protected final String pwd;
+    protected final String baseUrl;
+    protected final String defaultUrl;
+
+    private static final Set<String> SYS_DATABASES = new HashSet<>();
+    private static final Logger LOG = 
LoggerFactory.getLogger(StarRocksCatalog.class);
+
+    static {
+        SYS_DATABASES.add("information_schema");
+        SYS_DATABASES.add("_statistics_");
+    }
+
+    public StarRocksCatalog(
+        String catalogName,
+        String username,
+        String pwd,
+        String defaultUrl) {
+
+        checkArgument(StringUtils.isNotBlank(username));
+        checkArgument(StringUtils.isNotBlank(pwd));
+        checkArgument(StringUtils.isNotBlank(defaultUrl));
+
+        defaultUrl = defaultUrl.trim();
+        validateJdbcUrlWithDatabase(defaultUrl);
+        this.catalogName = catalogName;
+        this.username = username;
+        this.pwd = pwd;
+        this.defaultUrl = defaultUrl;
+        String[] strings = splitDefaultUrl(defaultUrl);
+        this.baseUrl = strings[0];
+        this.defaultDatabase = strings[1];
+    }
+
+    public StarRocksCatalog(
+        String catalogName,
+        String defaultDatabase,
+        String username,
+        String pwd,
+        String baseUrl) {
+
+        checkArgument(StringUtils.isNotBlank(username));
+        checkArgument(StringUtils.isNotBlank(pwd));
+        checkArgument(StringUtils.isNotBlank(baseUrl));
+
+        baseUrl = baseUrl.trim();
+        validateJdbcUrlWithoutDatabase(baseUrl);
+        this.catalogName = catalogName;
+        this.defaultDatabase = defaultDatabase;
+        this.username = username;
+        this.pwd = pwd;
+        this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+        this.defaultUrl = this.baseUrl + defaultDatabase;
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+
+            PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
+
+            List<String> databases = new ArrayList<>();
+            ResultSet rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String databaseName = rs.getString(1);
+                if (!SYS_DATABASES.contains(databaseName)) {
+                    databases.add(rs.getString(1));
+                }
+            }
+
+            return databases;
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Failed listing database in catalog %s", 
this.catalogName), e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws 
CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
+        }
+
+        try (Connection conn = DriverManager.getConnection(baseUrl + 
databaseName, username, pwd)) {
+            PreparedStatement ps =
+                conn.prepareStatement("SHOW TABLES;");
+
+            ResultSet rs = ps.executeQuery();
+
+            List<String> tables = new ArrayList<>();
+
+            while (rs.next()) {
+                tables.add(rs.getString(1));
+            }
+
+            return tables;
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Failed listing database in catalog %s", 
catalogName), e);
+        }
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath) throws CatalogException, 
TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+
+        String dbUrl = baseUrl + tablePath.getDatabaseName();
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, 
pwd)) {
+            Optional<PrimaryKey> primaryKey = 
getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName());
+
+            PreparedStatement ps =
+                conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1 
= 0;", tablePath.getFullNameWithQuoted()));
+
+            ResultSetMetaData tableMetaData = ps.getMetaData();
+
+            TableSchema.Builder builder = TableSchema.builder();
+            for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+                SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+                // TODO add default value and test it
+                
builder.column(PhysicalColumn.of(tableMetaData.getColumnName(i), type, 
tableMetaData.getColumnDisplaySize(i),
+                    tableMetaData.isNullable(i) == 
ResultSetMetaData.columnNullable, null,
+                    tableMetaData.getColumnLabel(i)));
+            }
+
+            primaryKey.ifPresent(builder::primaryKey);
+
+            TableIdentifier tableIdentifier = TableIdentifier.of(catalogName, 
tablePath.getDatabaseName(), tablePath.getTableName());
+            return CatalogTable.of(tableIdentifier, builder.build(), 
buildConnectorOptions(tablePath), Collections.emptyList(), "");
+        } catch (Exception e) {
+            throw new CatalogException(String.format("Failed getting table 
%s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) 
throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists) 
throws DatabaseAlreadyExistException, CatalogException {
+        try (Connection conn = DriverManager.getConnection(baseUrl, username, 
pwd)) {
+            if (ignoreIfExists) {
+                conn.createStatement().execute("CREATE DATABASE IF NOT EXISTS 
`" + tablePath.getDatabaseName() + "`");
+            } else {
+                conn.createStatement().execute("CREATE DATABASE `" + 
tablePath.getDatabaseName() + "`");
+            }
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Failed listing database in catalog %s", 
catalogName), e);
+        }
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) 
throws DatabaseNotExistException, CatalogException {
+        try (Connection conn = DriverManager.getConnection(baseUrl, username, 
pwd)) {
+            if (ignoreIfNotExists) {
+                conn.createStatement().execute("DROP DATABASE IF EXISTS `" + 
tablePath.getDatabaseName() + "`");
+            } else {
+                conn.createStatement().execute(String.format("DROP DATABASE 
`%s`", tablePath.getDatabaseName()));
+            }
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Failed listing database in catalog %s", 
catalogName), e);
+        }
+    }
+
+    /**
+     * @see com.mysql.cj.MysqlType
+     */
+    private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int 
colIndex) throws SQLException {
+        MysqlType starrocksType = 
MysqlType.getByName(metadata.getColumnTypeName(colIndex));
+        switch (starrocksType) {
+            case NULL:
+                return BasicType.VOID_TYPE;
+            case BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case BIT:
+            case TINYINT:
+                return BasicType.BYTE_TYPE;
+            case TINYINT_UNSIGNED:
+            case SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case SMALLINT_UNSIGNED:
+            case INT:
+            case MEDIUMINT:
+            case MEDIUMINT_UNSIGNED:
+                return BasicType.INT_TYPE;
+            case INT_UNSIGNED:
+            case BIGINT:
+                return BasicType.LONG_TYPE;
+            case FLOAT:
+            case FLOAT_UNSIGNED:
+                return BasicType.FLOAT_TYPE;
+            case DOUBLE:
+            case DOUBLE_UNSIGNED:
+                return BasicType.DOUBLE_TYPE;
+            case TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case TIMESTAMP:
+            case DATETIME:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case CHAR:
+            case VARCHAR:
+            case TINYTEXT:
+            case TEXT:
+            case MEDIUMTEXT:
+            case LONGTEXT:
+            case JSON:
+            case ENUM:
+                return BasicType.STRING_TYPE;
+            case BINARY:
+            case VARBINARY:
+            case TINYBLOB:
+            case BLOB:
+            case MEDIUMBLOB:
+            case LONGBLOB:
+            case GEOMETRY:
+                return PrimitiveByteArrayType.INSTANCE;
+            case BIGINT_UNSIGNED:
+            case DECIMAL:
+            case DECIMAL_UNSIGNED:
+                int precision = metadata.getPrecision(colIndex);
+                int scale = metadata.getScale(colIndex);
+                return new DecimalType(precision, scale);
+            default:
+                throw new 
StarRocksConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
String.format("Doesn't support Starrocks type '%s' yet", 
starrocksType.getName()));
+        }
+    }
+
+    @SuppressWarnings("MagicNumber")
+    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>(8);
+        options.put("connector", "starrocks");
+        options.put("url", baseUrl + tablePath.getDatabaseName());
+        options.put("table-name", tablePath.getFullName());
+        options.put("username", username);
+        options.put("password", pwd);
+        return options;
+    }
+
+    public void createTable(String sql) throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+        try (Connection conn = DriverManager.getConnection(baseUrl + 
getDefaultDatabase(), username, pwd)) {
+            conn.createStatement().execute(sql);
+        } catch (Exception e) {
+            throw new CatalogException(
+                String.format("Failed listing database in catalog %s", 
catalogName), e);
+        }
+    }
+
+    /**
+     * URL has to be without database, like "jdbc:mysql://localhost:5432/" or
+     * "jdbc:mysql://localhost:5432" rather than 
"jdbc:mysql://localhost:5432/db".
+     */
+    public static void validateJdbcUrlWithoutDatabase(String url) {
+        String[] parts = url.trim().split("\\/+");
+
+        checkArgument(parts.length == 2);
+    }
+
+    /**
+     * URL has to be with database, like "jdbc:mysql://localhost:5432/db" 
rather than "jdbc:mysql://localhost:5432/".
+     */
+    @SuppressWarnings("MagicNumber")
+    public static void validateJdbcUrlWithDatabase(String url) {
+        String[] parts = url.trim().split("\\/+");
+        checkArgument(parts.length == 3);
+    }
+
+    /**
+     * Ensure that the url was validated {@link #validateJdbcUrlWithDatabase}.
+     *
+     * @return The array size is fixed at 2, index 0 is base url, and index 1 
is default database.
+     */
+    public static String[] splitDefaultUrl(String defaultUrl) {
+        String[] res = new String[2];
+        int index = defaultUrl.lastIndexOf("/") + 1;
+        res[0] = defaultUrl.substring(0, index);
+        res[1] = defaultUrl.substring(index);
+        return res;
+    }
+
+    @Override
+    public String getDefaultDatabase() {
+        return defaultDatabase;
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+            // test connection, fail early if we cannot connect to database
+            conn.getCatalog();
+        } catch (SQLException e) {
+            throw new CatalogException(
+                String.format("Failed connecting to %s via JDBC.", 
defaultUrl), e);
+        }
+
+        LOG.info("Catalog {} established connection to {}", catalogName, 
defaultUrl);
+    }
+
+    @Override
+    public void close() throws CatalogException {
+        LOG.info("Catalog {} closing", catalogName);
+    }
+
+    protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) 
throws SQLException {
+
+        List<String> pkFields = new ArrayList<>();
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
+            ResultSet rs = conn.createStatement().executeQuery(
+                String.format("SELECT COLUMN_NAME FROM 
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND 
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION", schema, table));
+            while (rs.next()) {
+                String columnName = rs.getString("COLUMN_NAME");
+                pkFields.add(columnName);
+            }
+        }
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate a unique 
name in that case
+            String pkName = "pk_" + String.join("_", pkFields);
+            return Optional.of(PrimaryKey.of(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        checkArgument(StringUtils.isNotBlank(databaseName));
+
+        return listDatabases().contains(databaseName);
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        try {
+            return databaseExists(tablePath.getDatabaseName())
+                && 
listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
new file mode 100644
index 000000000..ece895d98
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import com.mysql.cj.MysqlType;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.Collections;
+import java.util.Map;
+
+@AutoService(DataTypeConvertor.class)
+public class StarRocksDataTypeConvertor implements 
DataTypeConvertor<MysqlType> {
+    public static final String PRECISION = "precision";
+    public static final String SCALE = "scale";
+
+    public static final Integer DEFAULT_PRECISION = 10;
+
+    public static final Integer DEFAULT_SCALE = 0;
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        checkNotNull(connectorDataType, "connectorDataType can not be null");
+        MysqlType mysqlType = MysqlType.getByName(connectorDataType);
+        Map<String, Object> dataTypeProperties;
+        switch (mysqlType) {
+            case BIGINT_UNSIGNED:
+            case DECIMAL:
+            case DECIMAL_UNSIGNED:
+                // parse precision and scale
+                int left = connectorDataType.indexOf("(");
+                int right = connectorDataType.indexOf(")");
+                int precision = DEFAULT_PRECISION;
+                int scale = DEFAULT_SCALE;
+                if (left != -1 && right != -1) {
+                    String[] precisionAndScale = 
connectorDataType.substring(left + 1, right).split(",");
+                    if (precisionAndScale.length == 2) {
+                        precision = Integer.parseInt(precisionAndScale[0]);
+                        scale = Integer.parseInt(precisionAndScale[1]);
+                    } else if (precisionAndScale.length == 1) {
+                        precision = Integer.parseInt(precisionAndScale[0]);
+                    }
+                }
+                dataTypeProperties = ImmutableMap.of(PRECISION, precision, 
SCALE, scale);
+                break;
+            default:
+                dataTypeProperties = Collections.emptyMap();
+                break;
+        }
+        return toSeaTunnelType(mysqlType, dataTypeProperties);
+    }
+
+    // todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType 
doesn't contains properties.
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(MysqlType mysqlType, 
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
+        checkNotNull(mysqlType, "mysqlType can not be null");
+
+        switch (mysqlType) {
+            case NULL:
+                return BasicType.VOID_TYPE;
+            case BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case BIT:
+            case TINYINT:
+                return BasicType.BYTE_TYPE;
+            case TINYINT_UNSIGNED:
+            case SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case SMALLINT_UNSIGNED:
+            case INT:
+            case MEDIUMINT:
+            case MEDIUMINT_UNSIGNED:
+                return BasicType.INT_TYPE;
+            case INT_UNSIGNED:
+            case BIGINT:
+                return BasicType.LONG_TYPE;
+            case FLOAT:
+            case FLOAT_UNSIGNED:
+                return BasicType.FLOAT_TYPE;
+            case DOUBLE:
+            case DOUBLE_UNSIGNED:
+                return BasicType.DOUBLE_TYPE;
+            case TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case TIMESTAMP:
+            case DATETIME:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            // TODO: to confirm
+            case CHAR:
+            case VARCHAR:
+            case TINYTEXT:
+            case TEXT:
+            case MEDIUMTEXT:
+            case LONGTEXT:
+            case JSON:
+            case ENUM:
+                return BasicType.STRING_TYPE;
+            case BINARY:
+            case VARBINARY:
+            case TINYBLOB:
+            case BLOB:
+            case MEDIUMBLOB:
+            case LONGBLOB:
+            case GEOMETRY:
+                return PrimitiveByteArrayType.INSTANCE;
+            case BIGINT_UNSIGNED:
+            case DECIMAL:
+            case DECIMAL_UNSIGNED:
+                Integer precision = MapUtils.getInteger(dataTypeProperties, 
PRECISION, DEFAULT_PRECISION);
+                Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE, 
DEFAULT_SCALE);
+                return new DecimalType(precision, scale);
+            // TODO: support 'SET' & 'YEAR' type
+            default:
+                throw 
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType);
+        }
+    }
+
+    @Override
+    public MysqlType toConnectorType(SeaTunnelDataType<?> seaTunnelDataType, 
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
+        SqlType sqlType = seaTunnelDataType.getSqlType();
+        // todo: verify
+        switch (sqlType) {
+            case ARRAY:
+            case MAP:
+            case ROW:
+            case STRING:
+                return MysqlType.VARCHAR;
+            case BOOLEAN:
+                return MysqlType.BOOLEAN;
+            case TINYINT:
+                return MysqlType.TINYINT;
+            case SMALLINT:
+                return MysqlType.SMALLINT;
+            case INT:
+                return MysqlType.INT;
+            case BIGINT:
+                return MysqlType.BIGINT;
+            case FLOAT:
+                return MysqlType.FLOAT;
+            case DOUBLE:
+                return MysqlType.DOUBLE;
+            case DECIMAL:
+                return MysqlType.DECIMAL;
+            case NULL:
+                return MysqlType.NULL;
+            case BYTES:
+                return MysqlType.BIT;
+            case DATE:
+                return MysqlType.DATE;
+            case TIME:
+                return MysqlType.DATETIME;
+            case TIMESTAMP:
+                return MysqlType.TIMESTAMP;
+            default:
+                throw new 
StarRocksConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
String.format("Doesn't support type '%s' yet", sqlType));
+        }
+    }
+
+    @Override
+    public String getIdentity() {
+        return "StarRocks";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
index f51feff7f..0a12c8c50 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
@@ -41,15 +41,14 @@ public class StarRocksSinkManager {
     private final SinkConfig sinkConfig;
     private final List<byte[]> batchList;
 
-    private StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
+    private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
     private ScheduledExecutorService scheduler;
     private ScheduledFuture<?> scheduledFuture;
     private volatile boolean initialize;
     private volatile Exception flushException;
     private int batchRowCount = 0;
     private long batchBytesSize = 0;
-
-    private Integer batchIntervalMs;
+    private final Integer batchIntervalMs;
 
     public StarRocksSinkManager(SinkConfig sinkConfig, List<String> fileNames) 
{
         this.sinkConfig = sinkConfig;
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 3320eab69..5ba7543c5 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -27,6 +27,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +35,7 @@ import java.util.Map;
 @Setter
 @Getter
 @ToString
-public class SinkConfig {
+public class SinkConfig implements Serializable {
 
     private static final int DEFAULT_BATCH_MAX_SIZE = 1024;
     private static final long DEFAULT_BATCH_BYTES = 5 * 1024 * 1024;
@@ -79,6 +80,21 @@ public class SinkConfig {
         .withDescription("The parameter of the stream load data_desc. " +
             "The way to specify the parameter is to add the original stream 
load parameter into map");
 
+    public static final Option<String> SAVE_MODE_CREATE_TEMPLATE = 
Options.key("save_mode_create_template")
+        .stringType()
+        .defaultValue("CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (\n" +
+            "${rowtype_fields}\n" +
+            ") ENGINE=OLAP\n" +
+            "DISTRIBUTED BY HASH (${rowtype_primary_key})" +
+            "PROPERTIES (\n" +
+            "    \"replication_num\" = \"1\" \n" +
+            ")").withDescription("Create table statement template, used to 
create StarRocks table");
+
+    public static final Option<String> QUERY_PORT = Options.key("query_port")
+        .stringType()
+        .defaultValue("9030")
+        .withDescription("FE MySQL server port");
+
     public static final Option<Integer> BATCH_MAX_SIZE = 
Options.key("batch_max_rows")
         .intType()
         .defaultValue(DEFAULT_BATCH_MAX_SIZE)
@@ -125,6 +141,7 @@ public class SinkConfig {
     }
 
     private List<String> nodeUrls;
+    private String jdbcUrl;
     private String username;
     private String password;
     private String database;
@@ -142,17 +159,22 @@ public class SinkConfig {
     private int maxRetryBackoffMs;
     private boolean enableUpsertDelete;
 
+    private String saveModeCreateTemplate = 
SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+
     private final Map<String, Object> streamLoadProps = new HashMap<>();
 
     public static SinkConfig loadConfig(Config pluginConfig) {
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setNodeUrls(pluginConfig.getStringList(NODE_URLS.key()));
         sinkConfig.setDatabase(pluginConfig.getString(DATABASE.key()));
-        sinkConfig.setTable(pluginConfig.getString(TABLE.key()));
-
+        sinkConfig.setJdbcUrl("jdbc:mysql://" + 
sinkConfig.getNodeUrls().get(0).split(":")[0] +
+            ":" + pluginConfig.getString(QUERY_PORT.key()) + "/");
         if (pluginConfig.hasPath(USERNAME.key())) {
             sinkConfig.setUsername(pluginConfig.getString(USERNAME.key()));
         }
+        if (pluginConfig.hasPath(TABLE.key())) {
+            sinkConfig.setTable(pluginConfig.getString(TABLE.key()));
+        }
         if (pluginConfig.hasPath(PASSWORD.key())) {
             sinkConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
         }
@@ -180,6 +202,9 @@ public class SinkConfig {
         if (pluginConfig.hasPath(ENABLE_UPSERT_DELETE.key())) {
             
sinkConfig.setEnableUpsertDelete(pluginConfig.getBoolean(ENABLE_UPSERT_DELETE.key()));
         }
+        if (pluginConfig.hasPath(SAVE_MODE_CREATE_TEMPLATE.key())) {
+            
sinkConfig.setSaveModeCreateTemplate(pluginConfig.getString(SAVE_MODE_CREATE_TEMPLATE.key()));
+        }
         parseSinkStreamLoadProperties(pluginConfig, sinkConfig);
         if (sinkConfig.streamLoadProps.containsKey(COLUMN_SEPARATOR)) {
             sinkConfig.setColumnSeparator((String) 
sinkConfig.streamLoadProps.get(COLUMN_SEPARATOR));
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
new file mode 100644
index 000000000..2179e77db
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.sink.SaveModeConstants;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.stream.Collectors;
+
+public class StarRocksSaveModeUtil {
+
+    public static String fillingCreateSql(String template, String database, 
String table, TableSchema tableSchema) {
+        String primaryKey = 
tableSchema.getPrimaryKey().getColumnNames().stream().map(r -> "`" + r + 
"`").collect(Collectors.joining(","));
+        String rowTypeFields = 
tableSchema.getColumns().stream().map(StarRocksSaveModeUtil::columnToStarrocksType)
+            .collect(Collectors.joining(",\n"));
+        return template.replace(String.format("${%s}", 
SaveModeConstants.DATABASE), database)
+            .replace(String.format("${%s}", SaveModeConstants.TABLE_NAME), 
table)
+            .replace(String.format("${%s}", SaveModeConstants.ROWTYPE_FIELDS), 
rowTypeFields)
+            .replace(String.format("${%s}", 
SaveModeConstants.ROWTYPE_PRIMARY_KEY), primaryKey);
+    }
+
+    static String columnToStarrocksType(Column column) {
+        checkNotNull(column, "The column is required.");
+        return String.format("`%s` %s %s %s", column.getName(), 
dataTypeToStarrocksType(column.getDataType()),
+            column.isNullable() ? "NULL" : "NOT NULL", 
column.getDefaultValue() == null ? "" : column.getDefaultValue().toString());
+    }
+
+    static String dataTypeToStarrocksType(SeaTunnelDataType<?> dataType) {
+        checkNotNull(dataType, "The SeaTunnel's data type is required.");
+        switch (dataType.getSqlType()) {
+            case NULL:
+            case TIME:
+                throw new IllegalArgumentException("Unsupported SeaTunnel's 
data type: " + dataType);
+            case STRING:
+            case BYTES:
+                return "STRING";
+            case BOOLEAN:
+                return "BOOLEAN";
+            case TINYINT:
+                return "TINYINT";
+            case SMALLINT:
+                return "SMALLINT";
+            case INT:
+                return "INT";
+            case BIGINT:
+                return "BIGINT";
+            case FLOAT:
+                return "FLOAT";
+            case DOUBLE:
+                return "DOUBLE";
+            case DATE:
+                return "DATE";
+            case TIMESTAMP:
+                return "DATETIME";
+            case ARRAY:
+                return "ARRAY<" + dataTypeToStarrocksType(((ArrayType<?, ?>) 
dataType).getElementType()) + ">";
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) dataType;
+                return String.format("Decimal(%d, %d)", 
decimalType.getPrecision(), decimalType.getScale());
+            case MAP:
+            case ROW:
+                return "JSON";
+            default:
+        }
+        throw new IllegalArgumentException("Unsupported SeaTunnel's data type: 
" + dataType);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 16f303742..c62300c46 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -25,8 +25,12 @@ import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkCon
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportDataSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -35,18 +39,24 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
 
 @AutoService(SeaTunnelSink.class)
-public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void> 
implements SupportDataSaveMode {
 
-    private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
-
+    private SinkConfig sinkConfig;
+    private DataSaveMode dataSaveMode;
     @Override
     public String getPluginName() {
         return "StarRocks";
@@ -54,13 +64,33 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
NODE_URLS.key(), DATABASE.key(), TABLE.key(), USERNAME.key(), PASSWORD.key());
         if (!result.isSuccess()) {
             throw new 
StarRocksConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                 String.format("PluginName: %s, PluginType: %s, Message: %s",
                     getPluginName(), PluginType.SINK, result.getMsg()));
         }
+        // TODO get catalog Table
+        CatalogTable catalogTable = null;
+        sinkConfig = SinkConfig.loadConfig(pluginConfig);
+        if (StringUtils.isEmpty(sinkConfig.getTable())) {
+            sinkConfig.setTable(catalogTable.getTableId().getTableName());
+        }
+        sinkConfig.setTable(catalogTable.getTableId().getTableName());
+        dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
+    }
+
+    private void autoCreateTable(String template) {
+        StarRocksCatalog starRocksCatalog = new StarRocksCatalog("StarRocks", 
sinkConfig.getDatabase(),
+            sinkConfig.getUsername(), sinkConfig.getPassword(), 
sinkConfig.getJdbcUrl());
+        if (!starRocksCatalog.databaseExists(sinkConfig.getDatabase())) {
+            
starRocksCatalog.createDatabase(TablePath.of(sinkConfig.getDatabase(), ""), 
true);
+        }
+        // TODO get catalog Table
+        CatalogTable catalogTable = null;
+        if 
(!starRocksCatalog.tableExists(TablePath.of(sinkConfig.getDatabase(), 
sinkConfig.getTable()))) {
+            
starRocksCatalog.createTable(StarRocksSaveModeUtil.fillingCreateSql(template, 
sinkConfig.getDatabase(), sinkConfig.getTable(), 
catalogTable.getTableSchema()));
+        }
     }
 
     @Override
@@ -75,6 +105,21 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
-        return new StarRocksSinkWriter(pluginConfig, seaTunnelRowType);
+        return new StarRocksSinkWriter(sinkConfig, seaTunnelRowType);
+    }
+
+    @Override
+    public DataSaveMode getDataSaveMode() {
+        return dataSaveMode;
+    }
+
+    @Override
+    public List<DataSaveMode> supportedDataSaveModeValues() {
+        return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
+    }
+
+    @Override
+    public void handleSaveMode(DataSaveMode saveMode) {
+        autoCreateTable(sinkConfig.getSaveModeCreateTemplate());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index b04750a94..8863c3115 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -34,10 +34,11 @@ public class StarRocksSinkFactory implements 
TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, 
SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.TABLE)
-                .optional(SinkConfig.LABEL_PREFIX, SinkConfig.BATCH_MAX_SIZE, 
SinkConfig.BATCH_MAX_BYTES,
-                        SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, 
SinkConfig.MAX_RETRY_BACKOFF_MS,
-                        SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, 
SinkConfig.STARROCKS_CONFIG, SinkConfig.ENABLE_UPSERT_DELETE)
+            .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, 
SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.QUERY_PORT)
+            .optional(SinkConfig.TABLE, SinkConfig.LABEL_PREFIX, 
SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES,
+                SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, 
SinkConfig.MAX_RETRY_BACKOFF_MS,
+                SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, 
SinkConfig.STARROCKS_CONFIG, SinkConfig.ENABLE_UPSERT_DELETE,
+                SinkConfig.SAVE_MODE_CREATE_TEMPLATE)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
index abe6e41cf..52647c34f 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
@@ -29,8 +29,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksIS
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksJsonSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksSinkOP;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
@@ -46,9 +44,8 @@ public class StarRocksSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     private final StarRocksISerializer serializer;
     private final StarRocksSinkManager manager;
 
-    public StarRocksSinkWriter(Config pluginConfig,
+    public StarRocksSinkWriter(SinkConfig sinkConfig,
                                SeaTunnelRowType seaTunnelRowType) {
-        SinkConfig sinkConfig = SinkConfig.loadConfig(pluginConfig);
         List<String> fieldNames = 
Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList());
         if (sinkConfig.isEnableUpsertDelete()) {
             fieldNames.add(StarRocksSinkOP.COLUMN_KEY);
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
new file mode 100644
index 000000000..51a430f14
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+@Disabled("Please Test it in your local environment")
+public class StarRocksCatalogTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StarRocksCatalogTest.class);
+
+    @Test
+    public void testCatalog() {
+        StarRocksCatalog catalog = new StarRocksCatalog("starrocks", "", 
"root", "123456", "jdbc:mysql://47.108.65.163:9030/");
+        List<String> databases = catalog.listDatabases();
+        LOGGER.info("find databases: " + databases);
+
+        if (!catalog.databaseExists("default")) {
+            catalog.createDatabase(TablePath.of("default", null), true);
+        }
+
+        databases = catalog.listDatabases();
+        LOGGER.info("find databases: " + databases);
+
+        catalog.createTable("CREATE TABLE IF NOT EXISTS `default`.`test` (\n" +
+            "`recruit_date`  DATE           NOT NULL COMMENT 
\"YYYY-MM-DD\",\n" +
+            "    `region_num`    TINYINT        COMMENT \"range [-128, 
127]\",\n" +
+            "    `num_plate`     SMALLINT       COMMENT \"range [-32768, 
32767] \",\n" +
+            "    `tel`           INT            COMMENT \"range [-2147483648, 
2147483647]\",\n" +
+            "    `id`            BIGINT         COMMENT \"range [-2^63 + 1 ~ 
2^63 - 1]\",\n" +
+            "    `password`      LARGEINT       COMMENT \"range [-2^127 + 1 ~ 
2^127 - 1]\",\n" +
+            "    `name`          CHAR(20)       NOT NULL COMMENT \"range 
char(m),m in (1-255)\",\n" +
+            "    `profile`       VARCHAR(500)   NOT NULL COMMENT \"upper limit 
value 1048576 bytes\",\n" +
+            "    `hobby`         STRING         NOT NULL COMMENT \"upper limit 
value 65533 bytes\",\n" +
+            "    `leave_time`    DATETIME       COMMENT \"YYYY-MM-DD 
HH:MM:SS\",\n" +
+            "    `channel`       FLOAT          COMMENT \"4 bytes\",\n" +
+            "    `income`        DOUBLE         COMMENT \"8 bytes\",\n" +
+            "    `account`       DECIMAL(12,4)  COMMENT \"\",\n" +
+            "    `ispass`        BOOLEAN        COMMENT \"true/false\"\n" +
+            ") ENGINE=OLAP\n" +
+            "DUPLICATE KEY(`recruit_date`, `region_num`)\n" +
+            "PARTITION BY RANGE(`recruit_date`)\n" +
+            "(\n" +
+            "    PARTITION p20220311 VALUES [('2022-03-11'), 
('2022-03-12')),\n" +
+            "    PARTITION p20220312 VALUES [('2022-03-12'), 
('2022-03-13')),\n" +
+            "    PARTITION p20220313 VALUES [('2022-03-13'), 
('2022-03-14')),\n" +
+            "    PARTITION p20220314 VALUES [('2022-03-14'), 
('2022-03-15')),\n" +
+            "    PARTITION p20220315 VALUES [('2022-03-15'), 
('2022-03-16'))\n" +
+            ")" +
+            "      DISTRIBUTED BY HASH (`id`)\n" +
+            "      PROPERTIES (\n" +
+            "            \"replication_num\" = \"1\" \n" +
+            "      );");
+        CatalogTable table = catalog.getTable(TablePath.of("default", "test"));
+        LOGGER.info("find table: " + table);
+    }
+
+}

Reply via email to