This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e6f92fd79b [Feature][Clickhouse] Support sink savemode  (#8086)
e6f92fd79b is described below

commit e6f92fd79ba4b1bac41c863a7d855548b44c247d
Author: Jast <shengh...@apache.org>
AuthorDate: Wed Dec 4 20:14:18 2024 +0800

    [Feature][Clickhouse] Support sink savemode  (#8086)
---
 docs/en/connector-v2/sink/Clickhouse.md            |  61 +++++
 docs/en/connector-v2/source/Hive.md                |   2 +-
 docs/zh/connector-v2/sink/Clickhouse.md            |  65 ++++-
 docs/zh/connector-v2/source/Hive.md                |   4 +-
 .../api/common/SeaTunnelAPIErrorCode.java          |   5 +-
 .../connector-clickhouse/pom.xml                   |   5 +
 .../clickhouse/catalog/ClickhouseCatalog.java      | 274 +++++++++++++++++++++
 .../catalog/ClickhouseCatalogFactory.java          |  53 ++++
 .../catalog/ClickhouseTypeConverter.java           | 178 +++++++++++++
 .../clickhouse/config/ClickhouseConfig.java        |  47 ++++
 .../clickhouse/config/ClickhouseType.java          |  44 ++++
 .../clickhouse/sink/client/ClickhouseSink.java     | 131 +++++++++-
 .../sink/client/ClickhouseSinkFactory.java         |  99 +-------
 .../clickhouse/util/ClickhouseCatalogUtil.java     |  58 +++++
 .../seatunnel/clickhouse/util/ClickhouseProxy.java | 155 ++++++++++++
 .../clickhouse}/util/CreateTableParser.java        |   2 +-
 .../clickhouse/ClickhouseCreateTableTest.java      | 252 +++++++++++++++++++
 .../seatunnel/common/util/CatalogUtil.java}        | 110 ++-------
 .../seatunnel/common}/util/CreateTableParser.java  |   2 +-
 .../connectors/doris/util/DorisCatalogUtil.java    |   1 +
 .../starrocks/catalog/StarRocksCatalog.java        |  32 ++-
 .../starrocks/sink/StarRocksSaveModeUtil.java      | 147 +----------
 .../catalog/StarRocksCreateTableTest.java          |  25 +-
 .../seatunnel/clickhouse/ClickhouseIT.java         | 103 ++++++++
 ...ickhouse_with_create_schema_when_not_exist.conf |  68 +++++
 ...create_schema_when_not_exist_and_drop_data.conf |  68 +++++
 .../clickhouse_with_error_when_data_exists.conf    |  68 +++++
 ...lickhouse_with_error_when_schema_not_exist.conf |  68 +++++
 ...house_with_recreate_schema_and_append_data.conf |  68 +++++
 ...clickhouse_with_recreate_schema_and_custom.conf |  69 ++++++
 30 files changed, 1904 insertions(+), 360 deletions(-)

diff --git a/docs/en/connector-v2/sink/Clickhouse.md 
b/docs/en/connector-v2/sink/Clickhouse.md
index 15d92f8c5f..ae67ceb232 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -59,8 +59,69 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 | primary_key                           | String  | No       | -       | Mark 
the primary key column from clickhouse table, and based on primary key execute 
INSERT/UPDATE/DELETE to clickhouse table.                                       
                                                                                
                                                        |
 | support_upsert                        | Boolean | No       | false   | 
Support upsert row by query primary key.                                        
                                                                                
                                                                                
                                                            |
 | allow_experimental_lightweight_delete | Boolean | No       | false   | Allow 
experimental lightweight delete based on `*MergeTree` table engine.             
                                                                                
                                                                                
                                                      |
+| schema_save_mode               | Enum    | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | Schema save mode. Please refer to the 
`schema_save_mode` section below.                                               
                                        |
+| data_save_mode                 | Enum    | no       | APPEND_DATA            
      | Data save mode. Please refer to the `data_save_mode` section below.     
                                                                                
    |
+| save_mode_create_template      | string  | no       | see below              
      | See below.                                                              
                                                                                
    |
 | common-options                        |         | No       | -       | Sink 
plugin common parameters, please refer to [Sink Common 
Options](../sink-common-options.md) for details.                                
                                                                                
                                                                                
|
 
+### schema_save_mode[Enum]
+
+Before starting the synchronization task, choose different processing options 
for the existing table schema.  
+Option descriptions:  
+`RECREATE_SCHEMA`: Create the table if it does not exist; drop and recreate 
the table when saving.  
+`CREATE_SCHEMA_WHEN_NOT_EXIST`: Create the table if it does not exist; skip if 
the table already exists.  
+`ERROR_WHEN_SCHEMA_NOT_EXIST`: Throw an error if the table does not exist.  
+`IGNORE`: Ignore the processing of the table.
+
+### data_save_mode[Enum]
+
+Before starting the synchronization task, choose different processing options 
for the existing data on the target side.  
+Option descriptions:  
+`DROP_DATA`: Retain the database schema but delete the data.  
+`APPEND_DATA`: Retain the database schema and the data.  
+`CUSTOM_PROCESSING`: Custom user-defined processing.  
+`ERROR_WHEN_DATA_EXISTS`: Throw an error if data exists.
+
+### save_mode_create_template
+
+Automatically create Doris tables using templates.  
+The table creation statements will be generated based on the upstream data 
types and schema. The default template can be modified as needed.
+
+Default template:
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
+    ${rowtype_primary_key},
+    ${rowtype_fields}
+) ENGINE = MergeTree()
+ORDER BY (${rowtype_primary_key})
+PRIMARY KEY (${rowtype_primary_key})
+SETTINGS
+    index_granularity = 8192;
+```
+
+If custom fields are added to the template, for example, adding an `id` field:
+
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
+    id,
+    ${rowtype_fields}
+) ENGINE = MergeTree()
+    ORDER BY (${rowtype_primary_key})
+    PRIMARY KEY (${rowtype_primary_key})
+    SETTINGS
+    index_granularity = 8192;
+```
+
+The connector will automatically retrieve the corresponding types from the 
upstream source and fill in the template, removing the `id` field from the 
`rowtype_fields`. This method can be used to modify custom field types and 
attributes.
+
+The following placeholders can be used:
+
+- `database`: Retrieves the database from the upstream schema.
+- `table_name`: Retrieves the table name from the upstream schema.
+- `rowtype_fields`: Retrieves all fields from the upstream schema and 
automatically maps them to Doris field descriptions.
+- `rowtype_primary_key`: Retrieves the primary key from the upstream schema 
(this may be a list).
+- `rowtype_unique_key`: Retrieves the unique key from the upstream schema 
(this may be a list).
+
 ## How to Create a Clickhouse Data Synchronization Jobs
 
 The following example demonstrates how to create a data synchronization job 
that writes randomly generated data to a Clickhouse database:
diff --git a/docs/en/connector-v2/source/Hive.md 
b/docs/en/connector-v2/source/Hive.md
index 938471c862..527a94fc94 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -203,7 +203,7 @@ source {
 
 sink {
   Assert {
-    source_table_name = hive_source
+    plugin_input = hive_source
     rules {
       row_rules = [
         {
diff --git a/docs/zh/connector-v2/sink/Clickhouse.md 
b/docs/zh/connector-v2/sink/Clickhouse.md
index 61a359f5c0..41e9cefc60 100644
--- a/docs/zh/connector-v2/sink/Clickhouse.md
+++ b/docs/zh/connector-v2/sink/Clickhouse.md
@@ -42,7 +42,7 @@
 | ARRAY          | Array                                                       
                                                                                
  |
 | MAP            | Map                                                         
                                                                                
  |
 
-## 输出选项
+## Sink 选项
 
 |                  名称                   |   类型    | 是否必须 |  默认值  |             
                                                                           描述   
                                                                                
     |
 
|---------------------------------------|---------|------|-------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
@@ -58,8 +58,71 @@
 | primary_key                           | String  | No   | -     | 
标记`clickhouse`表中的主键列,并根据主键执行INSERT/UPDATE/DELETE到`clickhouse`表.                 
                                                                                
                 |
 | support_upsert                        | Boolean | No   | false | 
支持按查询主键更新插入行.                                                                   
                                                                                
                 |
 | allow_experimental_lightweight_delete | Boolean | No   | false | 
允许基于`MergeTree`表引擎实验性轻量级删除.                                                     
                                                                                
                 |
+| schema_save_mode               | Enum    | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的`schema_save_mode`              
                                                                                
                      |
+| data_save_mode                 | Enum    | no       | APPEND_DATA            
      | 数据保存模式,请参考下面的`data_save_mode`。                                          
                                                                               |
+| save_mode_create_template      | string  | no       | see below              
      | 见下文。                                                                    
                                                                               |
 | common-options                        |         | No   | -     | 
Sink插件查用参数,详见[Sink常用选项](../sink-common-options.md).                             
                                                                                
                 |
 
+### schema_save_mode[Enum]
+
+在开启同步任务之前,针对现有的表结构选择不同的处理方案。
+选项介绍:  
+`RECREATE_SCHEMA` :表不存在时创建,表保存时删除并重建。
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :表不存在时会创建,表存在时跳过。  
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :表不存在时会报错。  
+`IGNORE` :忽略对表的处理。
+
+### data_save_mode[Enum]
+
+在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。
+选项介绍:  
+`DROP_DATA`: 保留数据库结构并删除数据。  
+`APPEND_DATA`:保留数据库结构,保留数据。  
+`CUSTOM_PROCESSING`:用户自定义处理。  
+`ERROR_WHEN_DATA_EXISTS`:有数据时报错。
+
+### save_mode_create_template
+
+使用模板自动创建Doris表,
+会根据上游数据类型和schema类型创建相应的建表语句,
+默认模板可以根据情况进行修改。
+
+默认模板:
+```sql
+CREATE TABLE IF NOT EXISTS  `${database}`.`${table}` (
+    ${rowtype_primary_key},
+    ${rowtype_fields}
+) ENGINE = MergeTree()
+ORDER BY (${rowtype_primary_key})
+PRIMARY KEY (${rowtype_primary_key})
+SETTINGS
+    index_granularity = 8192;
+```
+
+如果模板中填写了自定义字段,例如添加 id 字段
+
+```sql
+CREATE TABLE IF NOT EXISTS  `${database}`.`${table}` (
+    id,
+    ${rowtype_fields}
+) ENGINE = MergeTree()
+    ORDER BY (${rowtype_primary_key})
+    PRIMARY KEY (${rowtype_primary_key})
+    SETTINGS
+    index_granularity = 8192;
+```
+
+连接器会自动从上游获取对应类型完成填充,
+并从“rowtype_fields”中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。
+
+可以使用以下占位符:
+
+- database:用于获取上游schema中的数据库。
+- table_name:用于获取上游schema中的表名。
+- rowtype_fields:用于获取上游schema中的所有字段,自动映射到Doris的字段描述。
+- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)。
+- rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)。
+
 ## 如何创建一个clickhouse 同步任务
 
 以下示例演示如何创建将随机生成的数据写入Clickhouse数据库的数据同步作业。
diff --git a/docs/zh/connector-v2/source/Hive.md 
b/docs/zh/connector-v2/source/Hive.md
index f1bec9fd95..094d701b45 100644
--- a/docs/zh/connector-v2/source/Hive.md
+++ b/docs/zh/connector-v2/source/Hive.md
@@ -187,7 +187,7 @@ source {
     table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
     metastore_uri = "thrift://metastore:9083"
     hive.hadoop.conf-path = "/tmp/hadoop"
-    result_table_name = hive_source
+    plugin_output = hive_source
     hive_site_path = "/tmp/hive-site.xml"
     kerberos_principal = "hive/metastore.seatun...@example.com"
     kerberos_keytab_path = "/tmp/hive.keytab"
@@ -197,7 +197,7 @@ source {
 
 sink {
   Assert {
-    source_table_name = hive_source
+    plugin_input = hive_source
     rules {
       row_rules = [
         {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
index 002b60c49b..7c550b3cc3 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
@@ -30,7 +30,10 @@ public enum SeaTunnelAPIErrorCode implements 
SeaTunnelErrorCode {
     TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
     HANDLE_SAVE_MODE_FAILED("API-09", "Handle save mode failed"),
     SOURCE_ALREADY_HAS_DATA("API-10", "The target data source already has 
data"),
-    SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist");
+    SINK_TABLE_NOT_EXIST("API-11", "The sink table not exist"),
+    LIST_DATABASES_FAILED("API-12", "List databases failed"),
+    LIST_TABLES_FAILED("API-13", "List tables failed"),
+    GET_PRIMARY_KEY_FAILED("API-14", "Get primary key failed");
 
     private final String code;
     private final String description;
diff --git a/seatunnel-connectors-v2/connector-clickhouse/pom.xml 
b/seatunnel-connectors-v2/connector-clickhouse/pom.xml
index 22d2565a63..93ffad1d60 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/pom.xml
+++ b/seatunnel-connectors-v2/connector-clickhouse/pom.xml
@@ -68,6 +68,11 @@
             <version>${project.version}</version>
             <classifier>optional</classifier>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
     </dependencies>
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java
new file mode 100644
index 0000000000..4c7bba896e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.SQLPreviewResult;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseCatalogUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.clickhouse.client.ClickHouseColumn;
+import com.clickhouse.client.ClickHouseNode;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
+
+@Slf4j
+public class ClickhouseCatalog implements Catalog {
+
+    protected String defaultDatabase = "information_schema";
+    private ReadonlyConfig readonlyConfig;
+    private ClickhouseProxy proxy;
+    private final String template;
+
+    private String catalogName;
+
+    public ClickhouseCatalog(ReadonlyConfig readonlyConfig, String 
catalogName) {
+        this.readonlyConfig = readonlyConfig;
+        this.catalogName = catalogName;
+        this.template = readonlyConfig.get(SAVE_MODE_CREATE_TEMPLATE);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return proxy.listDatabases();
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(this.catalogName, 
databaseName);
+        }
+
+        return proxy.listTable(databaseName);
+    }
+
+    @Override
+    public CatalogTable getTable(TablePath tablePath)
+            throws CatalogException, TableNotExistException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(catalogName, tablePath);
+        }
+        List<ClickHouseColumn> clickHouseColumns =
+                proxy.getClickHouseColumns(tablePath.getFullNameWithQuoted());
+
+        try {
+            Optional<PrimaryKey> primaryKey =
+                    proxy.getPrimaryKey(tablePath.getDatabaseName(), 
tablePath.getTableName());
+
+            TableSchema.Builder builder = TableSchema.builder();
+            primaryKey.ifPresent(builder::primaryKey);
+            buildColumnsWithErrorCheck(
+                    tablePath,
+                    builder,
+                    clickHouseColumns.iterator(),
+                    column ->
+                            PhysicalColumn.of(
+                                    column.getColumnName(),
+                                    TypeConvertUtil.convert(column),
+                                    (long) column.getEstimatedLength(),
+                                    column.getScale(),
+                                    column.isNullable(),
+                                    null,
+                                    null));
+
+            TableIdentifier tableIdentifier =
+                    TableIdentifier.of(
+                            catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName());
+            return CatalogTable.of(
+                    tableIdentifier,
+                    builder.build(),
+                    buildConnectorOptions(tablePath),
+                    Collections.emptyList(),
+                    "");
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", 
tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        log.debug("Create table :{}.{}", tablePath.getDatabaseName(), 
tablePath.getTableName());
+        proxy.createTable(
+                tablePath.getDatabaseName(),
+                tablePath.getTableName(),
+                template,
+                table.getTableSchema());
+    }
+
+    @Override
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        proxy.dropTable(tablePath, ignoreIfNotExists);
+    }
+
+    @Override
+    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            if (tableExists(tablePath)) {
+                proxy.truncateTable(tablePath, ignoreIfNotExists);
+            }
+        } catch (Exception e) {
+            throw new CatalogException("Truncate table failed", e);
+        }
+    }
+
+    @Override
+    public void executeSql(TablePath tablePath, String sql) {
+        try {
+            proxy.executeSql(sql);
+        } catch (Exception e) {
+            throw new CatalogException(String.format("Failed EXECUTE SQL in 
catalog %s", sql), e);
+        }
+    }
+
+    @Override
+    public boolean isExistsData(TablePath tablePath) {
+        try {
+            return proxy.isExistsData(tablePath.getFullName());
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        proxy.createDatabase(tablePath.getDatabaseName(), ignoreIfExists);
+    }
+
+    @Override
+    public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        proxy.dropDatabase(tablePath.getDatabaseName(), ignoreIfNotExists);
+    }
+
+    @SuppressWarnings("MagicNumber")
+    private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+        Map<String, String> options = new HashMap<>(8);
+        options.put("connector", "clickhouse");
+        options.put("host", readonlyConfig.get(HOST));
+        options.put("database", tablePath.getDatabaseName());
+        return options;
+    }
+
+    @Override
+    public String getDefaultDatabase() {
+        return defaultDatabase;
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        List<ClickHouseNode> nodes = 
ClickhouseUtil.createNodes(readonlyConfig);
+        Properties clickhouseProperties = new Properties();
+        readonlyConfig
+                .get(CLICKHOUSE_CONFIG)
+                .forEach((key, value) -> clickhouseProperties.put(key, 
String.valueOf(value)));
+
+        clickhouseProperties.put("user", readonlyConfig.get(USERNAME));
+        clickhouseProperties.put("password", readonlyConfig.get(PASSWORD));
+        proxy = new ClickhouseProxy(nodes.get(0));
+    }
+
+    @Override
+    public void close() throws CatalogException {}
+
+    @Override
+    public String name() {
+        return catalogName;
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        checkArgument(StringUtils.isNotBlank(databaseName));
+        return listDatabases().contains(databaseName);
+    }
+
+    @Override
+    public boolean tableExists(TablePath tablePath) throws CatalogException {
+        return proxy.tableExists(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    public PreviewResult previewAction(
+            ActionType actionType, TablePath tablePath, Optional<CatalogTable> 
catalogTable) {
+        if (actionType == ActionType.CREATE_TABLE) {
+            Preconditions.checkArgument(catalogTable.isPresent(), 
"CatalogTable cannot be null");
+            return new SQLPreviewResult(
+                    ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+                            template,
+                            tablePath.getDatabaseName(),
+                            tablePath.getTableName(),
+                            catalogTable.get().getTableSchema(),
+                            ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()));
+        } else if (actionType == ActionType.DROP_TABLE) {
+            return new SQLPreviewResult(
+                    ClickhouseCatalogUtil.INSTANCE.getDropTableSql(tablePath, 
true));
+        } else if (actionType == ActionType.TRUNCATE_TABLE) {
+            return new SQLPreviewResult(
+                    
ClickhouseCatalogUtil.INSTANCE.getTruncateTableSql(tablePath));
+        } else if (actionType == ActionType.CREATE_DATABASE) {
+            return new SQLPreviewResult(
+                    ClickhouseCatalogUtil.INSTANCE.getCreateDatabaseSql(
+                            tablePath.getDatabaseName(), true));
+        } else if (actionType == ActionType.DROP_DATABASE) {
+            return new SQLPreviewResult(
+                    ClickhouseCatalogUtil.INSTANCE.getDropDatabaseSql(
+                            tablePath.getDatabaseName(), true));
+        } else {
+            throw new UnsupportedOperationException("Unsupported action type: 
" + actionType);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalogFactory.java
new file mode 100644
index 0000000000..12e7c8490b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalogFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ClickhouseCatalogFactory implements CatalogFactory {
+
+    public static final String IDENTIFIER = "clickhouse";
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        return new ClickhouseCatalog(options, catalogName);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(ClickhouseConfig.HOST)
+                .required(ClickhouseConfig.DATABASE)
+                .required(ClickhouseConfig.USERNAME)
+                .required(ClickhouseConfig.PASSWORD)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseTypeConverter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseTypeConverter.java
new file mode 100644
index 0000000000..6259d2f907
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseTypeConverter.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeConverter;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonError;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseType;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(TypeConverter.class)
+public class ClickhouseTypeConverter
+        implements BasicTypeConverter<BasicTypeDefine<ClickhouseType>> {
+    public static final ClickhouseTypeConverter INSTANCE = new 
ClickhouseTypeConverter();
+    public static final Integer MAX_DATETIME_SCALE = 9;
+    public static final String IDENTIFIER = "Clickhouse";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Column convert(BasicTypeDefine<ClickhouseType> typeDefine) {
+        throw new UnsupportedOperationException("Unsupported operation");
+    }
+
+    @Override
+    public BasicTypeDefine<ClickhouseType> reconvert(Column column) {
+        BasicTypeDefine.BasicTypeDefineBuilder builder =
+                BasicTypeDefine.builder()
+                        .name(column.getName())
+                        .nullable(column.isNullable())
+                        .comment(column.getComment())
+                        .defaultValue(column.getDefaultValue());
+
+        switch (column.getDataType().getSqlType()) {
+            case BOOLEAN:
+                builder.columnType(ClickhouseType.BOOLEAN);
+                builder.dataType(ClickhouseType.BOOLEAN);
+                break;
+            case TINYINT:
+                builder.columnType(ClickhouseType.TINYINT);
+                builder.dataType(ClickhouseType.TINYINT);
+                break;
+            case SMALLINT:
+                builder.columnType(ClickhouseType.SMALLINT);
+                builder.dataType(ClickhouseType.SMALLINT);
+                break;
+            case INT:
+                builder.columnType(ClickhouseType.INT);
+                builder.dataType(ClickhouseType.INT);
+                break;
+            case BIGINT:
+                builder.columnType(ClickhouseType.BIGINT);
+                builder.dataType(ClickhouseType.BIGINT);
+                break;
+            case FLOAT:
+                builder.columnType(ClickhouseType.FLOAT);
+                builder.dataType(ClickhouseType.FLOAT);
+                break;
+            case DOUBLE:
+                builder.columnType(ClickhouseType.DOUBLE);
+                builder.dataType(ClickhouseType.DOUBLE);
+                break;
+            case DATE:
+                builder.columnType(ClickhouseType.DATE);
+                builder.dataType(ClickhouseType.DATE);
+                break;
+            case TIME:
+            case STRING:
+                builder.columnType(ClickhouseType.STRING);
+                builder.dataType(ClickhouseType.STRING);
+                break;
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) column.getDataType();
+                builder.columnType(
+                        String.format(
+                                "%s(%s, %s)",
+                                ClickhouseType.DECIMAL,
+                                decimalType.getPrecision(),
+                                decimalType.getScale()));
+                builder.dataType(ClickhouseType.DECIMAL);
+                break;
+            case TIMESTAMP:
+                if (column.getScale() != null
+                        && column.getScale() > 0
+                        && column.getScale() <= MAX_DATETIME_SCALE) {
+                    builder.columnType(
+                            String.format("%s(%s)", ClickhouseType.DateTime64, 
column.getScale()));
+                    builder.scale(column.getScale());
+                } else {
+                    builder.columnType(String.format("%s(%s)", 
ClickhouseType.DateTime64, 0));
+                    builder.scale(0);
+                }
+                builder.dataType(ClickhouseType.DateTime64);
+                break;
+            case MAP:
+                MapType dataType = (MapType) column.getDataType();
+                SeaTunnelDataType keyType = dataType.getKeyType();
+                SeaTunnelDataType valueType = dataType.getValueType();
+                Column keyColumn =
+                        PhysicalColumn.of(
+                                column.getName() + ".key",
+                                (SeaTunnelDataType<?>) keyType,
+                                (Long) null,
+                                true,
+                                null,
+                                null);
+                String keyColumnType = reconvert(keyColumn).getColumnType();
+                Column valueColumn =
+                        PhysicalColumn.of(
+                                column.getName() + ".value",
+                                (SeaTunnelDataType<?>) valueType,
+                                (Long) null,
+                                true,
+                                null,
+                                null);
+                String valueColumnType = 
reconvert(valueColumn).getColumnType();
+
+                builder.dataType(ClickhouseType.MAP);
+                builder.columnType(
+                        String.format(
+                                "%s(%s, %s)", ClickhouseType.MAP, 
keyColumnType, valueColumnType));
+                break;
+            case ARRAY:
+                SeaTunnelDataType<?> arrayDataType = column.getDataType();
+                SeaTunnelDataType elementType = null;
+                if (arrayDataType instanceof ArrayType) {
+                    ArrayType arrayType = (ArrayType) arrayDataType;
+                    elementType = arrayType.getElementType();
+                }
+
+                Column arrayKeyColumn =
+                        PhysicalColumn.of(
+                                column.getName() + ".key",
+                                (SeaTunnelDataType<?>) elementType,
+                                (Long) null,
+                                true,
+                                null,
+                                null);
+                String arrayKeyColumnType = 
reconvert(arrayKeyColumn).getColumnType();
+                builder.dataType(ClickhouseType.ARRAY);
+                builder.columnType(
+                        String.format("%s(%s)", ClickhouseType.ARRAY, 
arrayKeyColumnType));
+                break;
+            default:
+                throw CommonError.convertToConnectorTypeError(
+                        IDENTIFIER, column.getDataType().getSqlType().name(), 
column.getName());
+        }
+        return builder.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index bb0417b171..1408430149 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -19,6 +19,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
 import java.time.ZoneId;
 import java.util.Collections;
@@ -178,4 +181,48 @@ public class ClickhouseConfig {
                     .defaultValue("/tmp/seatunnel/clickhouse-local/file")
                     .withDescription(
                             "The directory where ClickhouseFile stores 
temporary files locally.");
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")
+                    .enumType(SchemaSaveMode.class)
+                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+                    .withDescription(
+                            "different treatment schemes are selected for the 
existing surface structure of the target side");
+
+    public static final Option<DataSaveMode> DATA_SAVE_MODE =
+            Options.key("data_save_mode")
+                    .enumType(DataSaveMode.class)
+                    .defaultValue(DataSaveMode.APPEND_DATA)
+                    .withDescription(
+                            "different processing schemes are selected for 
data existing data on the target side");
+
+    public static final Option<String> CUSTOM_SQL =
+            Options.key("custom_sql")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("when schema_save_mode selects 
CUSTOM_PROCESSING custom SQL");
+
+    public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
+            Options.key("save_mode_create_template")
+                    .stringType()
+                    .defaultValue(
+                            "CREATE TABLE IF NOT EXISTS `"
+                                    + 
SaveModePlaceHolder.DATABASE.getPlaceHolder()
+                                    + "`.`"
+                                    + 
SaveModePlaceHolder.TABLE.getPlaceHolder()
+                                    + "` (\n"
+                                    + 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+                                    + ",\n"
+                                    + 
SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder()
+                                    + "\n"
+                                    + ") ENGINE = MergeTree()\n"
+                                    + "ORDER BY ("
+                                    + 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+                                    + ")\n"
+                                    + "PRIMARY KEY ("
+                                    + 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+                                    + ")\n"
+                                    + "SETTINGS\n"
+                                    + "    index_granularity = 8192;")
+                    .withDescription(
+                            "Create table statement template, used to create 
Clickhouse table");
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseType.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseType.java
new file mode 100644
index 0000000000..01fb0c57c0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+import java.util.Map;
+
+@Getter
+@AllArgsConstructor
+public class ClickhouseType {
+
+    public static final String STRING = "String";
+    public static final String TINYINT = "Int8";
+    public static final String SMALLINT = "Int16";
+    public static final String INT = "Int32";
+    public static final String BIGINT = "Int64";
+    public static final String FLOAT = "Float32";
+    public static final String BOOLEAN = "Bool";
+    public static final String DOUBLE = "Float64";
+    public static final String DATE = "Date";
+    public static final String DateTime64 = "DateTime64";
+    public static final String MAP = "Map";
+    public static final String ARRAY = "Array";
+    public static final String DECIMAL = "Decimal";
+    private String type;
+    private Map<String, Object> options;
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 22f18694e2..644b078b5b 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -17,30 +17,68 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
+import org.apache.seatunnel.api.sink.SaveModeHandler;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+
+import com.clickhouse.client.ClickHouseNode;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CUSTOM_SQL;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
 public class ClickhouseSink
-        implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, 
CKCommitInfo, CKAggCommitInfo> {
+        implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, 
CKCommitInfo, CKAggCommitInfo>,
+                SupportSaveMode {
 
     private ReaderOption option;
     private CatalogTable catalogTable;
 
-    public ClickhouseSink(ReaderOption option, CatalogTable catalogTable) {
-        this.option = option;
+    private ReadonlyConfig readonlyConfig;
+
+    public ClickhouseSink(CatalogTable catalogTable, ReadonlyConfig 
readonlyConfig) {
         this.catalogTable = catalogTable;
+        this.readonlyConfig = readonlyConfig;
     }
 
     @Override
@@ -51,6 +89,76 @@ public class ClickhouseSink
     @Override
     public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
+        List<ClickHouseNode> nodes = 
ClickhouseUtil.createNodes(readonlyConfig);
+        Properties clickhouseProperties = new Properties();
+        readonlyConfig
+                .get(CLICKHOUSE_CONFIG)
+                .forEach((key, value) -> clickhouseProperties.put(key, 
String.valueOf(value)));
+
+        clickhouseProperties.put("user", readonlyConfig.get(USERNAME));
+        clickhouseProperties.put("password", readonlyConfig.get(PASSWORD));
+        ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
+
+        Map<String, String> tableSchema = 
proxy.getClickhouseTableSchema(readonlyConfig.get(TABLE));
+        String shardKey = null;
+        String shardKeyType = null;
+        ClickhouseTable table =
+                proxy.getClickhouseTable(readonlyConfig.get(DATABASE), 
readonlyConfig.get(TABLE));
+        if (readonlyConfig.get(SPLIT_MODE)) {
+            if (!"Distributed".equals(table.getEngine())) {
+                throw new ClickhouseConnectorException(
+                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                        "split mode only support table which engine is "
+                                + "'Distributed' engine at now");
+            }
+            if (readonlyConfig.getOptional(SHARDING_KEY).isPresent()) {
+                shardKey = readonlyConfig.get(SHARDING_KEY);
+                shardKeyType = tableSchema.get(shardKey);
+            }
+        }
+        ShardMetadata metadata =
+                new ShardMetadata(
+                        shardKey,
+                        shardKeyType,
+                        table.getSortingKey(),
+                        readonlyConfig.get(DATABASE),
+                        readonlyConfig.get(TABLE),
+                        table.getEngine(),
+                        readonlyConfig.get(SPLIT_MODE),
+                        new Shard(1, 1, nodes.get(0)),
+                        readonlyConfig.get(USERNAME),
+                        readonlyConfig.get(PASSWORD));
+        proxy.close();
+        String[] primaryKeys = null;
+        if (readonlyConfig.getOptional(PRIMARY_KEY).isPresent()) {
+            String primaryKey = readonlyConfig.get(PRIMARY_KEY);
+            if (primaryKey == null || primaryKey.trim().isEmpty()) {
+                throw new ClickhouseConnectorException(
+                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, 
"primary_key can not be empty");
+            }
+            if (shardKey != null && !Objects.equals(primaryKey, shardKey)) {
+                throw new ClickhouseConnectorException(
+                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                        "sharding_key and primary_key must be consistent to 
ensure correct processing of cdc events");
+            }
+            primaryKeys = primaryKey.replaceAll("\\s+", "").split(",");
+        }
+        boolean supportUpsert = readonlyConfig.get(SUPPORT_UPSERT);
+        boolean allowExperimentalLightweightDelete =
+                readonlyConfig.get(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE);
+
+        ReaderOption option =
+                ReaderOption.builder()
+                        .shardMetadata(metadata)
+                        .properties(clickhouseProperties)
+                        .seaTunnelRowType(catalogTable.getSeaTunnelRowType())
+                        .tableEngine(table.getEngine())
+                        .tableSchema(tableSchema)
+                        .bulkSize(readonlyConfig.get(BULK_SIZE))
+                        .primaryKeys(primaryKeys)
+                        .supportUpsert(supportUpsert)
+                        
.allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
+                        .build();
         return new ClickhouseSinkWriter(option, context);
     }
 
@@ -69,4 +177,21 @@ public class ClickhouseSink
     public Optional<CatalogTable> getWriteCatalogTable() {
         return Optional.of(catalogTable);
     }
+
+    @Override
+    public Optional<SaveModeHandler> getSaveModeHandler() {
+        TablePath tablePath = TablePath.of(readonlyConfig.get(DATABASE), 
readonlyConfig.get(TABLE));
+        ClickhouseCatalog clickhouseCatalog =
+                new ClickhouseCatalog(readonlyConfig, 
ClickhouseCatalogFactory.IDENTIFIER);
+        SchemaSaveMode schemaSaveMode = 
readonlyConfig.get(ClickhouseConfig.SCHEMA_SAVE_MODE);
+        DataSaveMode dataSaveMode = 
readonlyConfig.get(ClickhouseConfig.DATA_SAVE_MODE);
+        return Optional.of(
+                new DefaultSaveModeHandler(
+                        schemaSaveMode,
+                        dataSaveMode,
+                        clickhouseCatalog,
+                        tablePath,
+                        catalogTable,
+                        readonlyConfig.get(CUSTOM_SQL)));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
index edc36eabba..0640ba59fa 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkFactory.java
@@ -24,27 +24,9 @@ import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
-import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
 
-import com.clickhouse.client.ClickHouseNode;
 import com.google.auto.service.AutoService;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
@@ -66,87 +48,10 @@ public class ClickhouseSinkFactory implements 
TableSinkFactory {
     }
 
     @Override
-    public TableSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, 
CKAggCommitInfo> createSink(
-            TableSinkFactoryContext context) {
+    public TableSink createSink(TableSinkFactoryContext context) {
         ReadonlyConfig readonlyConfig = context.getOptions();
         CatalogTable catalogTable = context.getCatalogTable();
-        List<ClickHouseNode> nodes = 
ClickhouseUtil.createNodes(readonlyConfig);
-        Properties clickhouseProperties = new Properties();
-        readonlyConfig
-                .get(CLICKHOUSE_CONFIG)
-                .forEach((key, value) -> clickhouseProperties.put(key, 
String.valueOf(value)));
-
-        clickhouseProperties.put("user", readonlyConfig.get(USERNAME));
-        clickhouseProperties.put("password", readonlyConfig.get(PASSWORD));
-        ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
-        try {
-            Map<String, String> tableSchema =
-                    proxy.getClickhouseTableSchema(readonlyConfig.get(TABLE));
-            String shardKey = null;
-            String shardKeyType = null;
-            ClickhouseTable table =
-                    proxy.getClickhouseTable(
-                            readonlyConfig.get(DATABASE), 
readonlyConfig.get(TABLE));
-            if (readonlyConfig.get(SPLIT_MODE)) {
-                if (!"Distributed".equals(table.getEngine())) {
-                    throw new ClickhouseConnectorException(
-                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
-                            "split mode only support table which engine is "
-                                    + "'Distributed' engine at now");
-                }
-                if (readonlyConfig.getOptional(SHARDING_KEY).isPresent()) {
-                    shardKey = readonlyConfig.get(SHARDING_KEY);
-                    shardKeyType = tableSchema.get(shardKey);
-                }
-            }
-            ShardMetadata metadata =
-                    new ShardMetadata(
-                            shardKey,
-                            shardKeyType,
-                            table.getSortingKey(),
-                            readonlyConfig.get(DATABASE),
-                            readonlyConfig.get(TABLE),
-                            table.getEngine(),
-                            readonlyConfig.get(SPLIT_MODE),
-                            new Shard(1, 1, nodes.get(0)),
-                            readonlyConfig.get(USERNAME),
-                            readonlyConfig.get(PASSWORD));
-            proxy.close();
-            String[] primaryKeys = null;
-            if (readonlyConfig.getOptional(PRIMARY_KEY).isPresent()) {
-                String primaryKey = readonlyConfig.get(PRIMARY_KEY);
-                if (primaryKey == null || primaryKey.trim().isEmpty()) {
-                    throw new ClickhouseConnectorException(
-                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
-                            "primary_key can not be empty");
-                }
-                if (shardKey != null && !Objects.equals(primaryKey, shardKey)) 
{
-                    throw new ClickhouseConnectorException(
-                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
-                            "sharding_key and primary_key must be consistent 
to ensure correct processing of cdc events");
-                }
-                primaryKeys = primaryKey.replaceAll("\\s+", "").split(",");
-            }
-            boolean supportUpsert = readonlyConfig.get(SUPPORT_UPSERT);
-            boolean allowExperimentalLightweightDelete =
-                    readonlyConfig.get(ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE);
-
-            ReaderOption option =
-                    ReaderOption.builder()
-                            .shardMetadata(metadata)
-                            .properties(clickhouseProperties)
-                            
.seaTunnelRowType(catalogTable.getSeaTunnelRowType())
-                            .tableEngine(table.getEngine())
-                            .tableSchema(tableSchema)
-                            .bulkSize(readonlyConfig.get(BULK_SIZE))
-                            .primaryKeys(primaryKeys)
-                            .supportUpsert(supportUpsert)
-                            
.allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
-                            .build();
-            return () -> new ClickhouseSink(option, catalogTable);
-        } finally {
-            proxy.close();
-        }
+        return () -> new ClickhouseSink(catalogTable, readonlyConfig);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
new file mode 100644
index 0000000000..bf4e02c3fb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseTypeConverter;
+import org.apache.seatunnel.connectors.seatunnel.common.util.CatalogUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class ClickhouseCatalogUtil extends CatalogUtil {
+
+    public static final ClickhouseCatalogUtil INSTANCE = new 
ClickhouseCatalogUtil();
+
+    public String columnToConnectorType(Column column) {
+        checkNotNull(column, "The column is required.");
+        return String.format(
+                "`%s` %s %s",
+                column.getName(),
+                
ClickhouseTypeConverter.INSTANCE.reconvert(column).getColumnType(),
+                StringUtils.isEmpty(column.getComment())
+                        ? ""
+                        : "COMMENT '" + column.getComment() + "'");
+    }
+
+    public String getDropTableSql(TablePath tablePath, boolean 
ignoreIfNotExists) {
+        if (ignoreIfNotExists) {
+            return "DROP TABLE IF EXISTS "
+                    + tablePath.getDatabaseName()
+                    + "."
+                    + tablePath.getTableName();
+        } else {
+            return "DROP TABLE " + tablePath.getDatabaseName() + "." + 
tablePath.getTableName();
+        }
+    }
+
+    public String getTruncateTableSql(TablePath tablePath) {
+        return "TRUNCATE TABLE " + tablePath.getDatabaseName() + "." + 
tablePath.getTableName();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
index c417818257..be48f728b1 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
@@ -18,28 +18,39 @@
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
 
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
 
 import com.clickhouse.client.ClickHouseClient;
+import com.clickhouse.client.ClickHouseColumn;
 import com.clickhouse.client.ClickHouseException;
 import com.clickhouse.client.ClickHouseFormat;
 import com.clickhouse.client.ClickHouseNode;
 import com.clickhouse.client.ClickHouseRecord;
 import com.clickhouse.client.ClickHouseRequest;
 import com.clickhouse.client.ClickHouseResponse;
+import lombok.extern.slf4j.Slf4j;
 
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
+@Slf4j
 @SuppressWarnings("magicnumber")
 public class ClickhouseProxy {
 
@@ -153,6 +164,19 @@ public class ClickhouseProxy {
         return schema;
     }
 
+    public List<ClickHouseColumn> getClickHouseColumns(String table) {
+        String sql = "desc " + table;
+        try (ClickHouseResponse response = 
this.clickhouseRequest.query(sql).executeAndWait()) {
+            return response.getColumns();
+
+        } catch (ClickHouseException e) {
+            throw new ClickhouseConnectorException(
+                    CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
+                    "Cannot get table schema from clickhouse",
+                    e);
+        }
+    }
+
     /**
      * Get the shard of the given cluster.
      *
@@ -267,6 +291,137 @@ public class ClickhouseProxy {
         }
     }
 
+    public boolean tableExists(String database, String table) {
+        String sql =
+                String.format(
+                        "select count(1) from system.tables where database = 
'%s' and name = '%s'",
+                        database, table);
+        try (ClickHouseResponse response = 
clickhouseRequest.query(sql).executeAndWait()) {
+            return response.firstRecord().getValue(0).asInteger() > 0;
+        } catch (ClickHouseException e) {
+            throw new ClickhouseConnectorException(
+                    SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get table 
from clickhouse", e);
+        }
+    }
+
+    public List<String> listDatabases() {
+        String sql = "select distinct database from system.tables";
+        try (ClickHouseResponse response = 
clickhouseRequest.query(sql).executeAndWait()) {
+            Iterable<ClickHouseRecord> records = response.records();
+            return StreamSupport.stream(records.spliterator(), false)
+                    .map(r -> r.getValue(0).asString())
+                    .collect(Collectors.toList());
+        } catch (ClickHouseException e) {
+            throw new ClickhouseConnectorException(
+                    SeaTunnelAPIErrorCode.LIST_DATABASES_FAILED,
+                    "Cannot list databases from clickhouse",
+                    e);
+        }
+    }
+
+    public List<String> listTable(String database) {
+        String sql = "SELECT name FROM system.tables WHERE database = '" + 
database + "'";
+        try (ClickHouseResponse response = 
clickhouseRequest.query(sql).executeAndWait()) {
+            Iterable<ClickHouseRecord> records = response.records();
+            return StreamSupport.stream(records.spliterator(), false)
+                    .map(r -> r.getValue(0).asString())
+                    .collect(Collectors.toList());
+        } catch (ClickHouseException e) {
+            throw new ClickhouseConnectorException(
+                    SeaTunnelAPIErrorCode.LIST_TABLES_FAILED,
+                    "Cannot list tables from clickhouse",
+                    e);
+        }
+    }
+
+    public void executeSql(String sql) {
+        try {
+            clickhouseRequest
+                    .write()
+                    .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+                    .query(sql)
+                    .execute()
+                    .get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void createTable(
+            String database, String table, String template, TableSchema 
tableSchema) {
+        String createTableSql =
+                ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+                        template,
+                        database,
+                        table,
+                        tableSchema,
+                        ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
+        log.debug("Create Clickhouse table sql: {}", createTableSql);
+        executeSql(createTableSql);
+    }
+
+    public Optional<PrimaryKey> getPrimaryKey(String schema, String table) 
throws SQLException {
+
+        List<String> pkFields;
+        String sql =
+                "SELECT\n"
+                        + "    name as column_name\n"
+                        + "FROM system.columns\n"
+                        + "WHERE table = '"
+                        + table
+                        + "'\n"
+                        + "  AND database = '"
+                        + schema
+                        + "'\n"
+                        + "  AND is_in_primary_key = 1\n"
+                        + "ORDER BY position;";
+        try (ClickHouseResponse response = 
clickhouseRequest.query(sql).executeAndWait()) {
+            Iterable<ClickHouseRecord> records = response.records();
+            pkFields =
+                    StreamSupport.stream(records.spliterator(), false)
+                            .map(r -> r.getValue(0).asString())
+                            .collect(Collectors.toList());
+        } catch (ClickHouseException e) {
+            throw new ClickhouseConnectorException(
+                    SeaTunnelAPIErrorCode.GET_PRIMARY_KEY_FAILED,
+                    "Cannot get primary key from clickhouse",
+                    e);
+        }
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate a unique 
name in that case
+            String pkName = "pk_" + String.join("_", pkFields);
+            return Optional.of(PrimaryKey.of(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    public boolean isExistsData(String tableName) throws ExecutionException, 
InterruptedException {
+        // 定义查询数据的SQL语句
+        String queryDataSql = "SELECT count(*) FROM " + tableName;
+        try (ClickHouseResponse response = 
clickhouseRequest.query(queryDataSql).executeAndWait()) {
+            return response.firstRecord().getValue(0).asInteger() > 0;
+        } catch (ClickHouseException e) {
+            throw new ClickhouseConnectorException(
+                    SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get table 
from clickhouse", e);
+        }
+    }
+
+    public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) {
+        executeSql(ClickhouseCatalogUtil.INSTANCE.getDropTableSql(tablePath, 
ignoreIfNotExists));
+    }
+
+    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
+        
executeSql(ClickhouseCatalogUtil.INSTANCE.getTruncateTableSql(tablePath));
+    }
+
+    public void createDatabase(String database, boolean ignoreIfExists) {
+        
executeSql(ClickhouseCatalogUtil.INSTANCE.getCreateDatabaseSql(database, 
ignoreIfExists));
+    }
+
+    public void dropDatabase(String database, boolean ignoreIfNotExists) {
+        executeSql(ClickhouseCatalogUtil.INSTANCE.getDropDatabaseSql(database, 
ignoreIfNotExists));
+    }
+
     public void close() {
         if (this.client != null) {
             this.client.close();
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/CreateTableParser.java
similarity index 98%
rename from 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
rename to 
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/CreateTableParser.java
index 6986967f7a..f3d49d8aef 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/util/CreateTableParser.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/CreateTableParser.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.starrocks.util;
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
 
 import lombok.Getter;
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
new file mode 100644
index 0000000000..5728b18bcf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse;
+
+import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseCatalogUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class ClickhouseCreateTableTest {
+
+    @Test
+    public void test() {
+        List<Column> columns = new ArrayList<>();
+
+        columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, (Long) null, 
true, null, ""));
+        columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, (Long) 
null, true, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "age", BasicType.INT_TYPE, (Long) null, true, null, 
"test comment"));
+        columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) 
null, true, null, ""));
+        columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long) 
null, true, null, ""));
+        columns.add(
+                PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long) 
null, true, null, ""));
+
+        String createTableSql =
+                ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+                        "CREATE TABLE IF NOT EXISTS  `${database}`.`${table}` 
(\n"
+                                + "    ${rowtype_primary_key},\n"
+                                + "    ${rowtype_fields}\n"
+                                + ") ENGINE = MergeTree()\n"
+                                + "ORDER BY (${rowtype_primary_key})\n"
+                                + "PRIMARY KEY (${rowtype_primary_key})\n"
+                                + "SETTINGS\n"
+                                + "    index_granularity = 8192;",
+                        "test1",
+                        "test2",
+                        TableSchema.builder()
+                                .primaryKey(PrimaryKey.of("", 
Arrays.asList("id", "age")))
+                                .constraintKey(
+                                        Arrays.asList(
+                                                ConstraintKey.of(
+                                                        
ConstraintKey.ConstraintType.UNIQUE_KEY,
+                                                        "unique_key",
+                                                        
Collections.singletonList(
+                                                                
ConstraintKey.ConstraintKeyColumn
+                                                                        .of(
+                                                                               
 "name",
+                                                                               
 ConstraintKey
+                                                                               
         .ColumnSortType
+                                                                               
         .DESC))),
+                                                ConstraintKey.of(
+                                                        
ConstraintKey.ConstraintType.UNIQUE_KEY,
+                                                        "unique_key2",
+                                                        
Collections.singletonList(
+                                                                
ConstraintKey.ConstraintKeyColumn
+                                                                        .of(
+                                                                               
 "score",
+                                                                               
 ConstraintKey
+                                                                               
         .ColumnSortType
+                                                                               
         .ASC)))))
+                                .columns(columns)
+                                .build(),
+                        ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
+        Assertions.assertEquals(
+                createTableSql,
+                "CREATE TABLE IF NOT EXISTS  `test1`.`test2` (\n"
+                        + "    `id` Int64 ,`age` Int32 COMMENT 'test 
comment',\n"
+                        + "    `name` String ,\n"
+                        + "`score` Int32 ,\n"
+                        + "`gender` Int8 ,\n"
+                        + "`create_time` Int64 \n"
+                        + ") ENGINE = MergeTree()\n"
+                        + "ORDER BY (`id`,`age`)\n"
+                        + "PRIMARY KEY (`id`,`age`)\n"
+                        + "SETTINGS\n"
+                        + "    index_granularity = 8192;");
+        System.out.println(createTableSql);
+
+        String createTemplate = 
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .primaryKey(PrimaryKey.of(StringUtils.EMPTY, 
Collections.emptyList()))
+                        .constraintKey(Collections.emptyList())
+                        .columns(columns)
+                        .build();
+        TablePath tablePath = TablePath.of("test1.test2");
+        SeaTunnelRuntimeException actualSeaTunnelRuntimeException =
+                Assertions.assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () ->
+                                
ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+                                        createTemplate,
+                                        "test1",
+                                        "test2",
+                                        tableSchema,
+                                        
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()));
+
+        String primaryKeyHolder = 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
+        SeaTunnelRuntimeException exceptSeaTunnelRuntimeException =
+                CommonError.sqlTemplateHandledError(
+                        tablePath.getFullName(),
+                        SaveModePlaceHolder.getDisplay(primaryKeyHolder),
+                        createTemplate,
+                        primaryKeyHolder,
+                        ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
+        Assertions.assertEquals(
+                exceptSeaTunnelRuntimeException.getMessage(),
+                actualSeaTunnelRuntimeException.getMessage());
+    }
+
+    @Test
+    public void testInSeq() {
+
+        List<Column> columns = new ArrayList<>();
+
+        columns.add(
+                PhysicalColumn.of("L_ORDERKEY", BasicType.INT_TYPE, (Long) 
null, false, null, ""));
+        columns.add(
+                PhysicalColumn.of("L_PARTKEY", BasicType.INT_TYPE, (Long) 
null, false, null, ""));
+        columns.add(
+                PhysicalColumn.of("L_SUPPKEY", BasicType.INT_TYPE, (Long) 
null, false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_LINENUMBER", BasicType.INT_TYPE, (Long) null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_QUANTITY", new DecimalType(15, 2), (Long) null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_EXTENDEDPRICE", new DecimalType(15, 2), (Long) 
null, false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_DISCOUNT", new DecimalType(15, 2), (Long) null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of("L_TAX", new DecimalType(15, 2), (Long) 
null, false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_RETURNFLAG", BasicType.STRING_TYPE, (Long) null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_LINESTATUS", BasicType.STRING_TYPE, (Long) null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_SHIPDATE", LocalTimeType.LOCAL_DATE_TYPE, (Long) 
null, false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_COMMITDATE",
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        (Long) null,
+                        false,
+                        null,
+                        ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_RECEIPTDATE",
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        (Long) null,
+                        false,
+                        null,
+                        ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_SHIPINSTRUCT", BasicType.STRING_TYPE, (Long) null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_SHIPMODE", BasicType.STRING_TYPE, (Long) null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_COMMENT", BasicType.STRING_TYPE, (Long) null, 
false, null, ""));
+
+        String result =
+                ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+                        "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` 
(\n"
+                                + "`L_COMMITDATE`,\n"
+                                + "${rowtype_primary_key},\n"
+                                + "L_SUPPKEY BIGINT NOT NULL,\n"
+                                + "${rowtype_fields}\n"
+                                + ") ENGINE=MergeTree()\n"
+                                + " ORDER BY (L_COMMITDATE, 
${rowtype_primary_key}, L_SUPPKEY)\n"
+                                + " PRIMARY KEY (L_COMMITDATE, 
${rowtype_primary_key}, L_SUPPKEY)\n"
+                                + "SETTINGS\n"
+                                + "    index_granularity = 8192;",
+                        "tpch",
+                        "lineitem",
+                        TableSchema.builder()
+                                .primaryKey(
+                                        PrimaryKey.of(
+                                                "", 
Arrays.asList("L_ORDERKEY", "L_LINENUMBER")))
+                                .columns(columns)
+                                .build(),
+                        ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
+        String expected =
+                "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
+                        + "`L_COMMITDATE` Date ,\n"
+                        + "`L_ORDERKEY` Int32 ,`L_LINENUMBER` Int32 ,\n"
+                        + "L_SUPPKEY BIGINT NOT NULL,\n"
+                        + "`L_PARTKEY` Int32 ,\n"
+                        + "`L_QUANTITY` Decimal(15, 2) ,\n"
+                        + "`L_EXTENDEDPRICE` Decimal(15, 2) ,\n"
+                        + "`L_DISCOUNT` Decimal(15, 2) ,\n"
+                        + "`L_TAX` Decimal(15, 2) ,\n"
+                        + "`L_RETURNFLAG` String ,\n"
+                        + "`L_LINESTATUS` String ,\n"
+                        + "`L_SHIPDATE` Date ,\n"
+                        + "`L_RECEIPTDATE` Date ,\n"
+                        + "`L_SHIPINSTRUCT` String ,\n"
+                        + "`L_SHIPMODE` String ,\n"
+                        + "`L_COMMENT` String \n"
+                        + ") ENGINE=MergeTree()\n"
+                        + " ORDER BY (L_COMMITDATE, 
`L_ORDERKEY`,`L_LINENUMBER`, L_SUPPKEY)\n"
+                        + " PRIMARY KEY (L_COMMITDATE, 
`L_ORDERKEY`,`L_LINENUMBER`, L_SUPPKEY)\n"
+                        + "SETTINGS\n"
+                        + "    index_granularity = 8192;";
+        Assertions.assertEquals(result, expected);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java
similarity index 64%
copy from 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
copy to 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java
index a89f0347e3..7d10260cf5 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java
@@ -15,18 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
+package org.apache.seatunnel.connectors.seatunnel.common.util;
 
 import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
-import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.starrocks.util.CreateTableParser;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -38,13 +33,17 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
-
 @Slf4j
-public class StarRocksSaveModeUtil {
+public abstract class CatalogUtil {
+
+    public abstract String columnToConnectorType(Column column);
 
-    public static String getCreateTableSql(
-            String template, String database, String table, TableSchema 
tableSchema) {
+    public String getCreateTableSql(
+            String template,
+            String database,
+            String table,
+            TableSchema tableSchema,
+            String optionsKey) {
         String primaryKey = "";
         if (tableSchema.getPrimaryKey() != null) {
             primaryKey =
@@ -65,7 +64,7 @@ public class StarRocksSaveModeUtil {
                 SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(),
                 primaryKey,
                 TablePath.of(database, table).getFullName(),
-                StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+                optionsKey);
         template =
                 template.replaceAll(
                         
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
@@ -75,7 +74,7 @@ public class StarRocksSaveModeUtil {
                 SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(),
                 uniqueKey,
                 TablePath.of(database, table).getFullName(),
-                StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+                optionsKey);
 
         template =
                 template.replaceAll(
@@ -87,7 +86,7 @@ public class StarRocksSaveModeUtil {
         String rowTypeFields =
                 tableSchema.getColumns().stream()
                         .filter(column -> 
!columnInTemplate.containsKey(column.getName()))
-                        .map(StarRocksSaveModeUtil::columnToStarrocksType)
+                        .map(x -> columnToConnectorType(x))
                         .collect(Collectors.joining(",\n"));
 
         if 
(template.contains(SaveModePlaceHolder.TABLE_NAME.getPlaceHolder())) {
@@ -105,21 +104,7 @@ public class StarRocksSaveModeUtil {
                         
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
     }
 
-    private static String columnToStarrocksType(Column column) {
-        checkNotNull(column, "The column is required.");
-        return String.format(
-                "`%s` %s %s %s",
-                column.getName(),
-                dataTypeToStarrocksType(
-                        column.getDataType(),
-                        column.getColumnLength() == null ? 0 : 
column.getColumnLength()),
-                column.isNullable() ? "NULL" : "NOT NULL",
-                StringUtils.isEmpty(column.getComment())
-                        ? ""
-                        : "COMMENT '" + column.getComment() + "'");
-    }
-
-    private static String mergeColumnInTemplate(
+    private String mergeColumnInTemplate(
             Map<String, CreateTableParser.ColumnInfo> columnInTemplate,
             TableSchema tableSchema,
             String template) {
@@ -138,7 +123,7 @@ public class StarRocksSaveModeUtil {
             if (StringUtils.isEmpty(columnInfo.getInfo())) {
                 if (columnMap.containsKey(col)) {
                     Column column = columnMap.get(col);
-                    String newCol = columnToStarrocksType(column);
+                    String newCol = columnToConnectorType(column);
                     String prefix = template.substring(0, 
columnInfo.getStartIndex() + offset);
                     String suffix = template.substring(offset + 
columnInfo.getEndIndex());
                     if (prefix.endsWith("`")) {
@@ -159,56 +144,15 @@ public class StarRocksSaveModeUtil {
         return template;
     }
 
-    private static String dataTypeToStarrocksType(SeaTunnelDataType<?> 
dataType, long length) {
-        checkNotNull(dataType, "The SeaTunnel's data type is required.");
-        switch (dataType.getSqlType()) {
-            case NULL:
-            case TIME:
-                return "VARCHAR(8)";
-            case STRING:
-                if (length > 65533 || length <= 0) {
-                    return "STRING";
-                } else {
-                    return "VARCHAR(" + length + ")";
-                }
-            case BYTES:
-                return "STRING";
-            case BOOLEAN:
-                return "BOOLEAN";
-            case TINYINT:
-                return "TINYINT";
-            case SMALLINT:
-                return "SMALLINT";
-            case INT:
-                return "INT";
-            case BIGINT:
-                return "BIGINT";
-            case FLOAT:
-                return "FLOAT";
-            case DOUBLE:
-                return "DOUBLE";
-            case DATE:
-                return "DATE";
-            case TIMESTAMP:
-                return "DATETIME";
-            case ARRAY:
-                return "ARRAY<"
-                        + dataTypeToStarrocksType(
-                                ((ArrayType<?, ?>) dataType).getElementType(), 
Long.MAX_VALUE)
-                        + ">";
-            case DECIMAL:
-                DecimalType decimalType = (DecimalType) dataType;
-                return String.format(
-                        "Decimal(%d, %d)", decimalType.getPrecision(), 
decimalType.getScale());
-            case MAP:
-            case ROW:
-                return "JSON";
-            default:
+    public String getDropDatabaseSql(String database, boolean 
ignoreIfNotExists) {
+        if (ignoreIfNotExists) {
+            return "DROP DATABASE IF EXISTS `" + database + "`";
+        } else {
+            return "DROP DATABASE `" + database + "`";
         }
-        throw new IllegalArgumentException("Unsupported SeaTunnel's data type: 
" + dataType);
     }
 
-    public static String getCreateDatabaseSql(String database, boolean 
ignoreIfExists) {
+    public String getCreateDatabaseSql(String database, boolean 
ignoreIfExists) {
         if (ignoreIfExists) {
             return "CREATE DATABASE IF NOT EXISTS `" + database + "`";
         } else {
@@ -216,15 +160,7 @@ public class StarRocksSaveModeUtil {
         }
     }
 
-    public static String getDropDatabaseSql(String database, boolean 
ignoreIfNotExists) {
-        if (ignoreIfNotExists) {
-            return "DROP DATABASE IF EXISTS `" + database + "`";
-        } else {
-            return "DROP DATABASE `" + database + "`";
-        }
-    }
-
-    public static String getDropTableSql(TablePath tablePath, boolean 
ignoreIfNotExists) {
+    public String getDropTableSql(TablePath tablePath, boolean 
ignoreIfNotExists) {
         if (ignoreIfNotExists) {
             return "DROP TABLE IF EXISTS " + tablePath.getFullName();
         } else {
@@ -232,7 +168,7 @@ public class StarRocksSaveModeUtil {
         }
     }
 
-    public static String getTruncateTableSql(TablePath tablePath) {
+    public String getTruncateTableSql(TablePath tablePath) {
         return "TRUNCATE TABLE " + tablePath.getFullName();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/CreateTableParser.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CreateTableParser.java
similarity index 98%
rename from 
seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/CreateTableParser.java
rename to 
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CreateTableParser.java
index a911f1e1a2..7baff4c5c3 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/CreateTableParser.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CreateTableParser.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris.util;
+package org.apache.seatunnel.connectors.seatunnel.common.util;
 
 import lombok.Getter;
 
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
index c7ad6f6505..91ce2a51d2 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.converter.TypeConverter;
 import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
+import org.apache.seatunnel.connectors.seatunnel.common.util.CreateTableParser;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index aecacf3eb2..ae97cccfa4 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -40,6 +40,7 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.sink.StarRocksSaveModeUtil;
 
@@ -203,11 +204,12 @@ public class StarRocksCatalog implements Catalog {
     public void createTable(TablePath tablePath, CatalogTable table, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
         this.createTable(
-                StarRocksSaveModeUtil.getCreateTableSql(
+                StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
                         template,
                         tablePath.getDatabaseName(),
                         tablePath.getTableName(),
-                        table.getTableSchema()));
+                        table.getTableSchema(),
+                        StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()));
     }
 
     @Override
@@ -215,7 +217,9 @@ public class StarRocksCatalog implements Catalog {
             throws TableNotExistException, CatalogException {
         try {
             conn.createStatement()
-                    .execute(StarRocksSaveModeUtil.getDropTableSql(tablePath, 
ignoreIfNotExists));
+                    .execute(
+                            StarRocksSaveModeUtil.INSTANCE.getDropTableSql(
+                                    tablePath, ignoreIfNotExists));
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed listing database in catalog %s", 
catalogName), e);
@@ -227,7 +231,7 @@ public class StarRocksCatalog implements Catalog {
         try {
             if (ignoreIfNotExists) {
                 conn.createStatement()
-                        
.execute(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
+                        
.execute(StarRocksSaveModeUtil.INSTANCE.getTruncateTableSql(tablePath));
             }
         } catch (Exception e) {
             throw new CatalogException(
@@ -264,7 +268,7 @@ public class StarRocksCatalog implements Catalog {
         try {
             conn.createStatement()
                     .execute(
-                            StarRocksSaveModeUtil.getCreateDatabaseSql(
+                            
StarRocksSaveModeUtil.INSTANCE.getCreateDatabaseSql(
                                     tablePath.getDatabaseName(), 
ignoreIfExists));
         } catch (Exception e) {
             throw new CatalogException(
@@ -278,7 +282,7 @@ public class StarRocksCatalog implements Catalog {
         try {
             conn.createStatement()
                     .execute(
-                            StarRocksSaveModeUtil.getDropDatabaseSql(
+                            StarRocksSaveModeUtil.INSTANCE.getDropDatabaseSql(
                                     tablePath.getDatabaseName(), 
ignoreIfNotExists));
         } catch (Exception e) {
             throw new CatalogException(
@@ -360,8 +364,6 @@ public class StarRocksCatalog implements Catalog {
         options.put("connector", "starrocks");
         options.put("url", baseUrl + tablePath.getDatabaseName());
         options.put("table-name", tablePath.getFullName());
-        options.put("username", username);
-        options.put("password", pwd);
         return options;
     }
 
@@ -497,18 +499,22 @@ public class StarRocksCatalog implements Catalog {
         if (actionType == ActionType.CREATE_TABLE) {
             Preconditions.checkArgument(catalogTable.isPresent(), 
"CatalogTable cannot be null");
             return new SQLPreviewResult(
-                    StarRocksSaveModeUtil.getCreateTableSql(
+                    StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
                             template,
                             tablePath.getDatabaseName(),
                             tablePath.getTableName(),
-                            catalogTable.get().getTableSchema()));
+                            catalogTable.get().getTableSchema(),
+                            
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()));
         } else if (actionType == ActionType.DROP_TABLE) {
-            return new 
SQLPreviewResult(StarRocksSaveModeUtil.getDropTableSql(tablePath, true));
+            return new SQLPreviewResult(
+                    StarRocksSaveModeUtil.INSTANCE.getDropTableSql(tablePath, 
true));
         } else if (actionType == ActionType.TRUNCATE_TABLE) {
-            return new 
SQLPreviewResult(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
+            return new SQLPreviewResult(
+                    
StarRocksSaveModeUtil.INSTANCE.getTruncateTableSql(tablePath));
         } else if (actionType == ActionType.CREATE_DATABASE) {
             return new SQLPreviewResult(
-                    
StarRocksSaveModeUtil.getCreateDatabaseSql(tablePath.getDatabaseName(), true));
+                    StarRocksSaveModeUtil.INSTANCE.getCreateDatabaseSql(
+                            tablePath.getDatabaseName(), true));
         } else if (actionType == ActionType.DROP_DATABASE) {
             return new SQLPreviewResult(
                     "DROP DATABASE IF EXISTS `" + tablePath.getDatabaseName() 
+ "`");
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
index a89f0347e3..02d3118e07 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -17,95 +17,24 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
-import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
 import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sql.template.SqlTemplate;
-import 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.starrocks.util.CreateTableParser;
+import org.apache.seatunnel.connectors.seatunnel.common.util.CatalogUtil;
 
 import org.apache.commons.lang3.StringUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
 
 @Slf4j
-public class StarRocksSaveModeUtil {
-
-    public static String getCreateTableSql(
-            String template, String database, String table, TableSchema 
tableSchema) {
-        String primaryKey = "";
-        if (tableSchema.getPrimaryKey() != null) {
-            primaryKey =
-                    tableSchema.getPrimaryKey().getColumnNames().stream()
-                            .map(r -> "`" + r + "`")
-                            .collect(Collectors.joining(","));
-        }
-        String uniqueKey = "";
-        if (!tableSchema.getConstraintKeys().isEmpty()) {
-            uniqueKey =
-                    tableSchema.getConstraintKeys().stream()
-                            .flatMap(c -> c.getColumnNames().stream())
-                            .map(r -> "`" + r.getColumnName() + "`")
-                            .collect(Collectors.joining(","));
-        }
-        SqlTemplate.canHandledByTemplateWithPlaceholder(
-                template,
-                SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(),
-                primaryKey,
-                TablePath.of(database, table).getFullName(),
-                StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
-        template =
-                template.replaceAll(
-                        
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getReplacePlaceHolder(),
-                        primaryKey);
-        SqlTemplate.canHandledByTemplateWithPlaceholder(
-                template,
-                SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getPlaceHolder(),
-                uniqueKey,
-                TablePath.of(database, table).getFullName(),
-                StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+public class StarRocksSaveModeUtil extends CatalogUtil {
 
-        template =
-                template.replaceAll(
-                        
SaveModePlaceHolder.ROWTYPE_UNIQUE_KEY.getReplacePlaceHolder(), uniqueKey);
-        Map<String, CreateTableParser.ColumnInfo> columnInTemplate =
-                CreateTableParser.getColumnList(template);
-        template = mergeColumnInTemplate(columnInTemplate, tableSchema, 
template);
-
-        String rowTypeFields =
-                tableSchema.getColumns().stream()
-                        .filter(column -> 
!columnInTemplate.containsKey(column.getName()))
-                        .map(StarRocksSaveModeUtil::columnToStarrocksType)
-                        .collect(Collectors.joining(",\n"));
-
-        if 
(template.contains(SaveModePlaceHolder.TABLE_NAME.getPlaceHolder())) {
-            // TODO: Remove this compatibility config
-            template =
-                    template.replaceAll(
-                            
SaveModePlaceHolder.TABLE_NAME.getReplacePlaceHolder(), table);
-            log.warn(
-                    "The variable placeholder `${table_name}` has been marked 
as deprecated and will be removed soon, please use `${table}`");
-        }
+    public static final StarRocksSaveModeUtil INSTANCE = new 
StarRocksSaveModeUtil();
 
-        return 
template.replaceAll(SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(), 
database)
-                .replaceAll(SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), 
table)
-                .replaceAll(
-                        
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields);
-    }
-
-    private static String columnToStarrocksType(Column column) {
+    public String columnToConnectorType(Column column) {
         checkNotNull(column, "The column is required.");
         return String.format(
                 "`%s` %s %s %s",
@@ -119,46 +48,6 @@ public class StarRocksSaveModeUtil {
                         : "COMMENT '" + column.getComment() + "'");
     }
 
-    private static String mergeColumnInTemplate(
-            Map<String, CreateTableParser.ColumnInfo> columnInTemplate,
-            TableSchema tableSchema,
-            String template) {
-        int offset = 0;
-        Map<String, Column> columnMap =
-                tableSchema.getColumns().stream()
-                        .collect(Collectors.toMap(Column::getName, 
Function.identity()));
-        List<CreateTableParser.ColumnInfo> columnInfosInSeq =
-                columnInTemplate.values().stream()
-                        .sorted(
-                                Comparator.comparingInt(
-                                        
CreateTableParser.ColumnInfo::getStartIndex))
-                        .collect(Collectors.toList());
-        for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) {
-            String col = columnInfo.getName();
-            if (StringUtils.isEmpty(columnInfo.getInfo())) {
-                if (columnMap.containsKey(col)) {
-                    Column column = columnMap.get(col);
-                    String newCol = columnToStarrocksType(column);
-                    String prefix = template.substring(0, 
columnInfo.getStartIndex() + offset);
-                    String suffix = template.substring(offset + 
columnInfo.getEndIndex());
-                    if (prefix.endsWith("`")) {
-                        prefix = prefix.substring(0, prefix.length() - 1);
-                        offset--;
-                    }
-                    if (suffix.startsWith("`")) {
-                        suffix = suffix.substring(1);
-                        offset--;
-                    }
-                    template = prefix + newCol + suffix;
-                    offset += newCol.length() - columnInfo.getName().length();
-                } else {
-                    throw new IllegalArgumentException("Can't find column " + 
col + " in table.");
-                }
-            }
-        }
-        return template;
-    }
-
     private static String dataTypeToStarrocksType(SeaTunnelDataType<?> 
dataType, long length) {
         checkNotNull(dataType, "The SeaTunnel's data type is required.");
         switch (dataType.getSqlType()) {
@@ -207,32 +96,4 @@ public class StarRocksSaveModeUtil {
         }
         throw new IllegalArgumentException("Unsupported SeaTunnel's data type: 
" + dataType);
     }
-
-    public static String getCreateDatabaseSql(String database, boolean 
ignoreIfExists) {
-        if (ignoreIfExists) {
-            return "CREATE DATABASE IF NOT EXISTS `" + database + "`";
-        } else {
-            return "CREATE DATABASE `" + database + "`";
-        }
-    }
-
-    public static String getDropDatabaseSql(String database, boolean 
ignoreIfNotExists) {
-        if (ignoreIfNotExists) {
-            return "DROP DATABASE IF EXISTS `" + database + "`";
-        } else {
-            return "DROP DATABASE `" + database + "`";
-        }
-    }
-
-    public static String getDropTableSql(TablePath tablePath, boolean 
ignoreIfNotExists) {
-        if (ignoreIfNotExists) {
-            return "DROP TABLE IF EXISTS " + tablePath.getFullName();
-        } else {
-            return "DROP TABLE " + tablePath.getFullName();
-        }
-    }
-
-    public static String getTruncateTableSql(TablePath tablePath) {
-        return "TRUNCATE TABLE " + tablePath.getFullName();
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
index 763413335a..c4d0616733 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
@@ -65,7 +65,7 @@ public class StarRocksCreateTableTest {
                 PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long) 
null, true, null, ""));
 
         String result =
-                StarRocksSaveModeUtil.getCreateTableSql(
+                StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
                         "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( 
                                                                                
                                                                  \n"
                                 + "${rowtype_primary_key}  ,       \n"
                                 + "${rowtype_unique_key} , \n"
@@ -110,7 +110,8 @@ public class StarRocksCreateTableTest {
                                                                                
         .ColumnSortType
                                                                                
         .ASC)))))
                                 .columns(columns)
-                                .build());
+                                .build(),
+                        StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
         Assertions.assertEquals(
                 "CREATE TABLE IF NOT EXISTS `test1`.`test2` (                  
                                                                                
                                                 \n"
                         + "`id` BIGINT NULL ,`age` INT NULL   ,       \n"
@@ -149,11 +150,12 @@ public class StarRocksCreateTableTest {
                 Assertions.assertThrows(
                         RuntimeException.class,
                         () ->
-                                StarRocksSaveModeUtil.getCreateTableSql(
+                                
StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
                                         createTemplate,
                                         tablePath.getDatabaseName(),
                                         tablePath.getTableName(),
-                                        catalogTable.getTableSchema()));
+                                        catalogTable.getTableSchema(),
+                                        
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()));
         String primaryKeyHolder = 
SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
         SeaTunnelRuntimeException exceptSeaTunnelRuntimeException =
                 CommonError.sqlTemplateHandledError(
@@ -233,7 +235,7 @@ public class StarRocksCreateTableTest {
                         "L_COMMENT", BasicType.STRING_TYPE, (Long) null, 
false, null, ""));
 
         String result =
-                StarRocksSaveModeUtil.getCreateTableSql(
+                StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
                         "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` 
(\n"
                                 + "`L_COMMITDATE`,\n"
                                 + "${rowtype_primary_key},\n"
@@ -252,7 +254,8 @@ public class StarRocksCreateTableTest {
                                         PrimaryKey.of(
                                                 "", 
Arrays.asList("L_ORDERKEY", "L_LINENUMBER")))
                                 .columns(columns)
-                                .build());
+                                .build(),
+                        StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
         String expected =
                 "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
                         + "`L_COMMITDATE` DATE NOT NULL ,\n"
@@ -290,7 +293,7 @@ public class StarRocksCreateTableTest {
         columns.add(PhysicalColumn.of("description", BasicType.STRING_TYPE, 
70000, true, null, ""));
 
         String result =
-                StarRocksSaveModeUtil.getCreateTableSql(
+                StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
                         "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( 
                                                                                
                                                                  \n"
                                 + "${rowtype_primary_key}  ,       \n"
                                 + "`create_time` DATETIME NOT NULL ,  \n"
@@ -312,7 +315,8 @@ public class StarRocksCreateTableTest {
                         TableSchema.builder()
                                 .primaryKey(PrimaryKey.of("", 
Arrays.asList("id", "age")))
                                 .columns(columns)
-                                .build());
+                                .build(),
+                        StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
 
         Assertions.assertEquals(
                 "CREATE TABLE IF NOT EXISTS `test1`.`test2` (                  
                                                                                
                                                 \n"
@@ -347,7 +351,7 @@ public class StarRocksCreateTableTest {
         columns.add(PhysicalColumn.of("description", BasicType.STRING_TYPE, 
70000, true, null, ""));
 
         String result =
-                StarRocksSaveModeUtil.getCreateTableSql(
+                StarRocksSaveModeUtil.INSTANCE.getCreateTableSql(
                         "create table '${database}'.'${table}'(\n"
                                 + "     ${rowtype_fields}\n"
                                 + " )\n"
@@ -358,7 +362,8 @@ public class StarRocksCreateTableTest {
                                 .primaryKey(
                                         PrimaryKey.of("test", 
Arrays.asList("id", "age", "name")))
                                 .columns(columns)
-                                .build());
+                                .build(),
+                        StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
 
         Assertions.assertEquals(
                 "create table 'test1'.'test2'(\n"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index 76bdfaa281..b830a11389 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -107,6 +107,86 @@ public class ClickhouseIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode());
     }
 
+    @TestTemplate
+    public void clickhouseWithCreateSchemaWhenNotExist(TestContainer 
container) throws Exception {
+        String tableName = "default.sink_table_for_schema";
+        Container.ExecResult execResult =
+                
container.executeJob("/clickhouse_with_create_schema_when_not_exist.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(100, countData(tableName));
+        execResult = 
container.executeJob("/clickhouse_with_create_schema_when_not_exist.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(200, countData(tableName));
+        dropTable(tableName);
+    }
+
+    @TestTemplate
+    public void clickhouseWithRecreateSchemaAndAppendData(TestContainer 
container)
+            throws Exception {
+        String tableName = "default.sink_table_for_schema";
+        Container.ExecResult execResult =
+                
container.executeJob("/clickhouse_with_recreate_schema_and_append_data.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(100, countData(tableName));
+        execResult = 
container.executeJob("/clickhouse_with_recreate_schema_and_append_data.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(100, countData(tableName));
+        dropTable(tableName);
+    }
+
+    @TestTemplate
+    public void clickhouseWithErrorWhenSchemaNotExist(TestContainer container) 
throws Exception {
+        Container.ExecResult execResult =
+                
container.executeJob("/clickhouse_with_error_when_schema_not_exist.conf");
+        Assertions.assertEquals(1, execResult.getExitCode());
+        Assertions.assertTrue(
+                execResult
+                        .getStderr()
+                        .contains(
+                                "ErrorCode:[API-11], ErrorDescription:[The 
sink table not exist]"));
+    }
+
+    @TestTemplate
+    public void 
clickhouseWithCreateSchemaWhenNotExistAndDropData(TestContainer container)
+            throws Exception {
+        String tableName = "default.sink_table_for_schema";
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/clickhouse_with_create_schema_when_not_exist_and_drop_data.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(100, countData(tableName));
+        execResult =
+                container.executeJob(
+                        
"/clickhouse_with_create_schema_when_not_exist_and_drop_data.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(100, countData(tableName));
+        dropTable(tableName);
+    }
+
+    @TestTemplate
+    public void clickhouseWithErrorWhenDataExists(TestContainer container) 
throws Exception {
+        String tableName = "default.sink_table_for_schema";
+        Container.ExecResult execResult =
+                
container.executeJob("/clickhouse_with_error_when_data_exists.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(100, countData(tableName));
+        execResult = 
container.executeJob("/clickhouse_with_error_when_data_exists.conf");
+        Assertions.assertEquals(1, execResult.getExitCode());
+        Assertions.assertTrue(
+                execResult.getStderr().contains("The target data source 
already has data"));
+        dropTable(tableName);
+    }
+
+    @TestTemplate
+    public void clickhouseRecreateSchemaAndCustom(TestContainer container) 
throws Exception {
+        String tableName = "default.sink_table_for_schema";
+        Container.ExecResult execResult =
+                
container.executeJob("/clickhouse_with_recreate_schema_and_custom.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(101, countData(tableName));
+        dropTable(tableName);
+    }
+
     @BeforeAll
     @Override
     public void startUp() throws Exception {
@@ -194,6 +274,29 @@ public class ClickhouseIT extends TestSuiteBase implements 
TestResource {
         return connection.createArrayOf(sqlType, elements);
     }
 
+    private int countData(String tableName) {
+        try {
+            String sql = "select count(1) from " + tableName;
+            ResultSet resultSet = 
this.connection.createStatement().executeQuery(sql);
+            if (resultSet.next()) {
+                return resultSet.getInt(1);
+            } else {
+                return -1;
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void dropTable(String tableName) {
+        try {
+            Statement statement = this.connection.createStatement();
+            statement.execute("drop table if exists " + tableName);
+        } catch (SQLException e) {
+            throw new RuntimeException("Drop table failed!", e);
+        }
+    }
+
     private void batchInsertData() {
         String sql = CONFIG.getString(INSERT_SQL);
         PreparedStatement preparedStatement = null;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist.conf
new file mode 100644
index 0000000000..a05d856ff6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_date = date
+        c_time = time
+        c_timestamp = timestamp
+        c_map = "map<string, int>"
+        c_array = "array<int>"
+      }
+      primaryKey {
+        name = "c_string"
+        columnNames = [c_string]
+      }
+    }
+    row.num = 100
+  }
+}
+
+sink {
+  Clickhouse {
+    host = "clickhouse:8123"
+    database = "default"
+    table = "sink_table_for_schema"
+    username = "default"
+    password = ""
+    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+    "data_save_mode"="APPEND_DATA"
+    primary_key = "c_string"
+    support_upsert = true
+    allow_experimental_lightweight_delete = true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist_and_drop_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist_and_drop_data.conf
new file mode 100644
index 0000000000..cbb772e6da
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_not_exist_and_drop_data.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_date = date
+        c_time = time
+        c_timestamp = timestamp
+        c_map = "map<string, int>"
+        c_array = "array<int>"
+      }
+      primaryKey {
+        name = "c_string"
+        columnNames = [c_string]
+      }
+    }
+    row.num = 100
+  }
+}
+
+sink {
+  Clickhouse {
+    host = "clickhouse:8123"
+    database = "default"
+    table = "sink_table_for_schema"
+    username = "default"
+    password = ""
+    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+    "data_save_mode"="DROP_DATA"
+    primary_key = "c_string"
+    support_upsert = true
+    allow_experimental_lightweight_delete = true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_data_exists.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_data_exists.conf
new file mode 100644
index 0000000000..42d4ce8fb1
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_data_exists.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_date = date
+        c_time = time
+        c_timestamp = timestamp
+        c_map = "map<string, int>"
+        c_array = "array<int>"
+      }
+      primaryKey {
+        name = "c_string"
+        columnNames = [c_string]
+      }
+    }
+    row.num = 100
+  }
+}
+
+sink {
+  Clickhouse {
+    host = "clickhouse:8123"
+    database = "default"
+    table = "sink_table_for_schema"
+    username = "default"
+    password = ""
+    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+    "data_save_mode"="ERROR_WHEN_DATA_EXISTS"
+    primary_key = "c_string"
+    support_upsert = true
+    allow_experimental_lightweight_delete = true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_schema_not_exist.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_schema_not_exist.conf
new file mode 100644
index 0000000000..39983a90f5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_error_when_schema_not_exist.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_date = date
+        c_time = time
+        c_timestamp = timestamp
+        c_map = "map<string, int>"
+        c_array = "array<int>"
+      }
+      primaryKey {
+        name = "c_string"
+        columnNames = [c_string]
+      }
+    }
+    row.num = 100
+  }
+}
+
+sink {
+  Clickhouse {
+    host = "clickhouse:8123"
+    database = "default"
+    table = "sink_table_for_schema"
+    username = "default"
+    password = ""
+    "schema_save_mode"="ERROR_WHEN_SCHEMA_NOT_EXIST"
+    "data_save_mode"="APPEND_DATA"
+    primary_key = "c_string"
+    support_upsert = true
+    allow_experimental_lightweight_delete = true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_append_data.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_append_data.conf
new file mode 100644
index 0000000000..057252aeba
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_append_data.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_date = date
+        c_time = time
+        c_timestamp = timestamp
+        c_map = "map<string, int>"
+        c_array = "array<int>"
+      }
+      primaryKey {
+        name = "c_string"
+        columnNames = [c_string]
+      }
+    }
+    row.num = 100
+  }
+}
+
+sink {
+  Clickhouse {
+    host = "clickhouse:8123"
+    database = "default"
+    table = "sink_table_for_schema"
+    username = "default"
+    password = ""
+    "schema_save_mode"="RECREATE_SCHEMA"
+    "data_save_mode"="APPEND_DATA"
+    primary_key = "c_string"
+    support_upsert = true
+    allow_experimental_lightweight_delete = true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_custom.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_custom.conf
new file mode 100644
index 0000000000..0d3e56cd8b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_recreate_schema_and_custom.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 10000
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_date = date
+        c_time = time
+        c_timestamp = timestamp
+        c_map = "map<string, int>"
+        c_array = "array<int>"
+      }
+      primaryKey {
+        name = "c_string"
+        columnNames = [c_string]
+      }
+    }
+    row.num = 100
+  }
+}
+
+sink {
+  Clickhouse {
+    host = "clickhouse:8123"
+    database = "default"
+    table = "sink_table_for_schema"
+    username = "default"
+    password = ""
+    custom_sql="INSERT INTO default.sink_table_for_schema ( c_string) VALUES ( 
'1' );"
+    "schema_save_mode"="RECREATE_SCHEMA"
+    "data_save_mode"="CUSTOM_PROCESSING"
+    primary_key = "c_string"
+    support_upsert = true
+    allow_experimental_lightweight_delete = true
+  }
+}
\ No newline at end of file


Reply via email to