This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b2375fffe8 [Feature] Support SaveMode on Doris (#6085)
b2375fffe8 is described below

commit b2375fffe803083ea89c201e38c8396e2def7cbe
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Thu Jan 4 14:52:10 2024 +0800

    [Feature] Support SaveMode on Doris (#6085)
---
 docs/en/connector-v2/sink/Doris.md                 | 112 +++++--
 .../seatunnel/api/sink/DefaultSaveModeHandler.java |   3 +
 .../apache/seatunnel/api/table/catalog/Column.java |   3 +
 .../api/table/catalog/MetadataColumn.java          |  12 +
 .../api/table/catalog/PhysicalColumn.java          |  17 +
 .../connectors/doris/catalog/DorisCatalog.java     |  25 ++
 .../doris/catalog/DorisCatalogFactory.java         |   4 +
 .../connectors/doris/config/DorisConfig.java       |   6 +-
 .../connectors/doris/config/DorisOptions.java      |  94 ++++--
 .../connectors/doris/rest/PartitionDefinition.java | 147 --------
 .../connectors/doris/rest/RestService.java         | 373 +--------------------
 .../connectors/doris/rest/models/Backend.java      |  40 ---
 .../connectors/doris/rest/models/BackendRow.java   |  41 ---
 .../connectors/doris/rest/models/Field.java        | 134 --------
 .../connectors/doris/rest/models/QueryPlan.java    |  70 ----
 .../connectors/doris/rest/models/Schema.java       | 108 ------
 .../connectors/doris/rest/models/Tablet.java       |  80 -----
 .../seatunnel/connectors/doris/sink/DorisSink.java |  47 ++-
 .../connectors/doris/sink/DorisSinkFactory.java    |  59 +++-
 .../doris/sink/writer/DorisSinkState.java          |   4 +-
 .../doris/sink/writer/DorisSinkWriter.java         |  33 +-
 .../doris/sink/writer/DorisStreamLoad.java         |   7 +-
 .../connectors/doris/util/DorisCatalogUtil.java    |  80 +++--
 .../seatunnel/connectors/doris/util/IOUtils.java   |  49 ---
 .../doris/util/UnsupportedTypeConverterUtils.java  | 101 ++++++
 .../doris/catalog/DorisCreateTableTest.java        | 280 ++++++++++++++++
 .../e2e/connector/doris/DorisCatalogIT.java        | 152 +++++++--
 .../resources/write-cdc-changelog-to-doris.conf    |   3 +-
 .../src/test/resources/doris-jdbc-to-doris.conf    |   3 +-
 29 files changed, 935 insertions(+), 1152 deletions(-)

diff --git a/docs/en/connector-v2/sink/Doris.md 
b/docs/en/connector-v2/sink/Doris.md
index de0c47453a..a485eaf8c7 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -32,21 +32,91 @@ Version Supported
 
 ## Sink Options
 
-|        Name         |  Type  | Required |  Default   |                       
                                                                                
                                  Description                                   
                                                                                
                       |
-|---------------------|--------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| fenodes             | String | Yes      | -          | `Doris` cluster 
fenodes address, the format is `"fe_ip:fe_http_port, ..."`                      
                                                                                
                                                                                
                             |
-| username            | String | Yes      | -          | `Doris` user username 
                                                                                
                                                                                
                                                                                
                       |
-| password            | String | Yes      | -          | `Doris` user password 
                                                                                
                                                                                
                                                                                
                       |
-| table.identifier    | String | Yes      | -          | The name of `Doris` 
table                                                                           
                                                                                
                                                                                
                         |
-| sink.label-prefix   | String | Yes      | -          | The label prefix used 
by stream load imports. In the 2pc scenario, global uniqueness is required to 
ensure the EOS semantics of SeaTunnel.                                          
                                                                                
                         |
-| sink.enable-2pc     | bool   | No       | -          | Whether to enable 
two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. 
For two-phase commit, please refer to 
[here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).
                                     |
-| sink.enable-delete  | bool   | No       | -          | Whether to enable 
deletion. This option requires Doris table to enable batch delete function 
(0.15+ version is enabled by default), and only supports Unique model. you can 
get more detail at this 
[link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual)
 |
-| sink.check-interval | int    | No       | 10000      | check exception with 
the interval while loading                                                      
                                                                                
                                                                                
                        |
-| sink.max-retries    | int    | No       | 3          | the max retry times 
if writing records to database failed                                           
                                                                                
                                                                                
                         |
-| sink.buffer-size    | int    | No       | 256 * 1024 | the buffer size to 
cache data for stream load.                                                     
                                                                                
                                                                                
                          |
-| sink.buffer-count   | int    | No       | 3          | the buffer count to 
cache data for stream load.                                                     
                                                                                
                                                                                
                         |
-| doris.batch.size    | int    | No       | 1024       | the batch size of the 
write to doris each http request, when the row reaches the size or checkpoint 
is executed, the data of cached will write to server.                           
                                                                                
                         |
-| doris.config        | map    | yes      | -          | This option is used 
to support operations such as `insert`, `delete`, and `update` when 
automatically generate sql,and supported formats.                               
                                                                                
                                     |
+|              Name              |  Type   | Required |           Default      
      |                                                                         
                                                                Description     
                                                                                
                                                     |
+|--------------------------------|---------|----------|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| fenodes                        | String  | Yes      | -                      
      | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, 
..."`                                                                           
                                                                                
                                                        |
+| query-port                     | int     | No       | 9030                   
      | `Doris` Fenodes query_port                                              
                                                                                
                                                                                
                                                     |
+| username                       | String  | Yes      | -                      
      | `Doris` user username                                                   
                                                                                
                                                                                
                                                     |
+| password                       | String  | Yes      | -                      
      | `Doris` user password                                                   
                                                                                
                                                                                
                                                     |
+| database                       | String  | Yes      | -                      
      | The database name of `Doris` table, use `${database_name}` to represent 
the upstream table name                                                         
                                                                                
                                                     |
+| table                          | String  | Yes      | -                      
      | The table name of `Doris` table,  use `${table_name}` to represent the 
upstream table name                                                             
                                                                                
                                                      |
+| table.identifier               | String  | Yes      | -                      
      | The name of `Doris` table, it will deprecate after version 2.3.5, 
please use `database` and `table` instead.                                      
                                                                                
                                                           |
+| sink.label-prefix              | String  | Yes      | -                      
      | The label prefix used by stream load imports. In the 2pc scenario, 
global uniqueness is required to ensure the EOS semantics of SeaTunnel.         
                                                                                
                                                          |
+| sink.enable-2pc                | bool    | No       | -                      
      | Whether to enable two-phase commit (2pc), the default is true, to 
ensure Exactly-Once semantics. For two-phase commit, please refer to 
[here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).
                                     |
+| sink.enable-delete             | bool    | No       | -                      
      | Whether to enable deletion. This option requires Doris table to enable 
batch delete function (0.15+ version is enabled by default), and only supports 
Unique model. you can get more detail at this 
[link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual)
 |
+| sink.check-interval            | int     | No       | 10000                  
      | check exception with the interval while loading                         
                                                                                
                                                                                
                                                     |
+| sink.max-retries               | int     | No       | 3                      
      | the max retry times if writing records to database failed               
                                                                                
                                                                                
                                                     |
+| sink.buffer-size               | int     | No       | 256 * 1024             
      | the buffer size to cache data for stream load.                          
                                                                                
                                                                                
                                                     |
+| sink.buffer-count              | int     | No       | 3                      
      | the buffer count to cache data for stream load.                         
                                                                                
                                                                                
                                                     |
+| doris.batch.size               | int     | No       | 1024                   
      | the batch size of the write to doris each http request, when the row 
reaches the size or checkpoint is executed, the data of cached will write to 
server.                                                                         
                                                           |
+| needs_unsupported_type_casting | boolean | No       | false                  
      | Whether to enable the unsupported type casting, such as Decimal64 to 
Double                                                                          
                                                                                
                                                        |
+| schema_save_mode               | Enum    | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to 
`schema_save_mode` below                                                        
                                                                                
                                                                                
       |
+| data_save_mode                 | Enum    | no       | APPEND_DATA            
      | the data save mode, please refer to `data_save_mode` below              
                                                                                
                                                                                
                                                     |
+| save_mode_create_template      | string  | no       | see below              
      | see below                                                               
                                                                                
                                                                                
                                                     |
+| custom_sql                     | String  | no       | -                      
      | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the 
CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be 
executed. SQL will be executed before synchronization tasks.                    
                                                               |
+| doris.config                   | map     | yes      | -                      
      | This option is used to support operations such as `insert`, `delete`, 
and `update` when automatically generate sql,and supported formats.             
                                                                                
                                                       |
+
+### schema_save_mode[Enum]
+
+Before the synchronous task is turned on, different treatment schemes are 
selected for the existing surface structure of the target side.  
+Option introduction:  
+`RECREATE_SCHEMA` :Will create when the table does not exist, delete and 
rebuild when the table is saved        
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, 
skipped when the table is saved        
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not 
exist
+
+### data_save_mode[Enum]
+
+Before the synchronous task is turned on, different processing schemes are 
selected for data existing data on the target side.  
+Option introduction:  
+`DROP_DATA`: Preserve database structure and delete data  
+`APPEND_DATA`:Preserve database structure, preserve data  
+`CUSTOM_PROCESSING`:User defined processing  
+`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported
+
+### save_mode_create_template
+
+We use templates to automatically create Doris tables,
+which will create corresponding table creation statements based on the type of 
upstream data and schema type,
+and the default template can be modified according to the situation.
+
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
+(
+    ${rowtype_fields}
+) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
+    DISTRIBUTED BY HASH (${rowtype_primary_key})
+    PROPERTIES
+(
+    "replication_num" = "1"
+);
+```
+
+If a custom field is filled in the template, such as adding an `id` field
+
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
+(   
+    id,
+    ${rowtype_fields}
+) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
+    DISTRIBUTED BY HASH (${rowtype_primary_key})
+    PROPERTIES
+(
+    "replication_num" = "1"
+);
+```
+
+The connector will automatically obtain the corresponding type from the 
upstream to complete the filling,
+and remove the id field from `rowtype_fields`. This method can be used to 
customize the modification of field types and attributes.
+
+You can use the following placeholders
+
+- database: Used to get the database in the upstream schema
+- table_name: Used to get the table name in the upstream schema
+- rowtype_fields: Used to get all the fields in the upstream schema, we will 
automatically map to the field
+  description of Doris
+- rowtype_primary_key: Used to get the primary key in the upstream schema 
(maybe a list)
+- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe 
a list)
 
 ## Data Type Mapping
 
@@ -125,7 +195,8 @@ sink {
     fenodes = "doris_cdc_e2e:8030"
     username = root
     password = ""
-    table.identifier = "test.e2e_table_sink"
+    database = "test"
+    table = "e2e_table_sink"
     sink.label-prefix = "test-cdc"
     sink.enable-2pc = "true"
     sink.enable-delete = "true"
@@ -197,7 +268,8 @@ sink {
     fenodes = "doris_cdc_e2e:8030"
     username = root
     password = ""
-    table.identifier = "test.e2e_table_sink"
+    database = "test"
+    table = "e2e_table_sink"
     sink.label-prefix = "test-cdc"
     sink.enable-2pc = "true"
     sink.enable-delete = "true"
@@ -218,7 +290,8 @@ sink {
         fenodes = "e2e_dorisdb:8030"
         username = root
         password = ""
-        table.identifier = "test.e2e_table_sink"
+        database = "test"
+        table = "e2e_table_sink"
         sink.enable-2pc = "true"
         sink.label-prefix = "test_json"
         doris.config = {
@@ -238,7 +311,8 @@ sink {
         fenodes = "e2e_dorisdb:8030"
         username = root
         password = ""
-        table.identifier = "test.e2e_table_sink"
+        database = "test"
+        table = "e2e_table_sink"
         sink.enable-2pc = "true"
         sink.label-prefix = "test_csv"
         doris.config = {
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
index e9ce7b261c..9566658979 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java
@@ -136,6 +136,9 @@ public class DefaultSaveModeHandler implements 
SaveModeHandler {
     }
 
     protected void createTable() {
+        if (!catalog.databaseExists(tablePath.getDatabaseName())) {
+            catalog.createDatabase(TablePath.of(tablePath.getDatabaseName(), 
""), true);
+        }
         catalog.createTable(tablePath, catalogTable, true);
     }
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
index bec10b3d75..109d025695 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java
@@ -133,4 +133,7 @@ public abstract class Column implements Serializable {
 
     /** Returns a copy of the column. */
     public abstract Column copy();
+
+    /** Returns a copy of the column with a replaced name. */
+    public abstract Column rename(String newColumnName);
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
index 5325dac791..9294c13506 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java
@@ -70,4 +70,16 @@ public class MetadataColumn extends Column {
         return MetadataColumn.of(
                 name, dataType, columnLength, metadataKey, nullable, 
defaultValue, comment);
     }
+
+    @Override
+    public Column rename(String newColumnName) {
+        return MetadataColumn.of(
+                newColumnName,
+                dataType,
+                columnLength,
+                metadataKey,
+                nullable,
+                defaultValue,
+                comment);
+    }
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
index 164752d468..926dcc2eba 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java
@@ -144,4 +144,21 @@ public class PhysicalColumn extends Column {
                 options,
                 longColumnLength);
     }
+
+    @Override
+    public Column rename(String newColumnName) {
+        return PhysicalColumn.of(
+                newColumnName,
+                dataType,
+                columnLength,
+                nullable,
+                defaultValue,
+                comment,
+                sourceType,
+                isUnsigned,
+                isZeroFill,
+                bitLen,
+                options,
+                longColumnLength);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
index 071bf1f608..0e5faef550 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
@@ -334,4 +334,29 @@ public class DorisCatalog implements Catalog {
         options.put(DorisOptions.PASSWORD.key(), password);
         return options;
     }
+
+    public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        try {
+            if (ignoreIfNotExists) {
+                conn.createStatement()
+                        .execute(String.format("TRUNCATE TABLE  %s", 
tablePath.getFullName()));
+            }
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed TRUNCATE TABLE in catalog %s", 
tablePath.getFullName()),
+                    e);
+        }
+    }
+
+    public boolean isExistsData(TablePath tablePath) {
+        String tableName = tablePath.getFullName();
+        String sql = String.format("select * from %s limit 1;", tableName);
+        try (PreparedStatement ps = conn.prepareStatement(sql)) {
+            ResultSet resultSet = ps.executeQuery();
+            return resultSet.next();
+        } catch (SQLException e) {
+            throw new CatalogException(String.format("Failed executeSql error 
%s", sql), e);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
index 3df6bcca24..16db24f48a 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
@@ -21,9 +21,13 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.connectors.doris.config.DorisConfig;
 import org.apache.seatunnel.connectors.doris.config.DorisOptions;
 
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
 public class DorisCatalogFactory implements CatalogFactory {
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
index 98a43c7b8e..ea93b8f551 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
@@ -41,6 +41,7 @@ import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_RE
 import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_SINK_CONFIG_PREFIX;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_TABLET_SIZE;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.SAVE_MODE_CREATE_TEMPLATE;
@@ -51,7 +52,6 @@ import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_ENA
 import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_ENABLE_DELETE;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_LABEL_PREFIX;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_MAX_RETRIES;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
 
 @Setter
@@ -64,7 +64,6 @@ public class DorisConfig implements Serializable {
     private String username;
     private String password;
     private Integer queryPort;
-    private String tableIdentifier;
     private int batchSize;
 
     // source option
@@ -89,6 +88,7 @@ public class DorisConfig implements Serializable {
     private Integer bufferSize;
     private Integer bufferCount;
     private Properties streamLoadProps;
+    private boolean needsUnsupportedTypeCasting;
 
     // create table option
     private String createTableTemplate;
@@ -105,7 +105,6 @@ public class DorisConfig implements Serializable {
         dorisConfig.setFrontends(config.get(FENODES));
         dorisConfig.setUsername(config.get(USERNAME));
         dorisConfig.setPassword(config.get(PASSWORD));
-        dorisConfig.setTableIdentifier(config.get(TABLE_IDENTIFIER));
         dorisConfig.setQueryPort(config.get(QUERY_PORT));
         dorisConfig.setStreamLoadProps(parseStreamLoadProperties(config));
 
@@ -129,6 +128,7 @@ public class DorisConfig implements Serializable {
         dorisConfig.setBufferSize(config.get(SINK_BUFFER_SIZE));
         dorisConfig.setBufferCount(config.get(SINK_BUFFER_COUNT));
         dorisConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE));
+        
dorisConfig.setNeedsUnsupportedTypeCasting(config.get(NEEDS_UNSUPPORTED_TYPE_CASTING));
 
         // create table option
         
dorisConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE));
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
index 04a4f02851..7dd4612c48 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
@@ -17,17 +17,18 @@
 
 package org.apache.seatunnel.connectors.doris.config;
 
-import org.apache.seatunnel.shade.com.google.common.collect.ImmutableMap;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.DataSaveMode;
+import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
 import java.util.Map;
 
+import static 
org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA;
+
 public interface DorisOptions {
 
-    int DORIS_TABLET_SIZE_MIN = 1;
     int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
     int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
     int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
@@ -36,32 +37,11 @@ public interface DorisOptions {
     Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
     int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
     int DORIS_BATCH_SIZE_DEFAULT = 1024;
-    long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
     int DEFAULT_SINK_CHECK_INTERVAL = 10000;
     int DEFAULT_SINK_MAX_RETRIES = 3;
     int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024;
     int DEFAULT_SINK_BUFFER_COUNT = 3;
 
-    Map<String, String> DEFAULT_CREATE_PROPERTIES =
-            ImmutableMap.of(
-                    "replication_allocation", "tag.location.default: 3",
-                    "storage_format", "V2",
-                    "disable_auto_compaction", "false");
-
-    String DEFAULT_CREATE_TEMPLATE =
-            "CREATE TABLE ${table_identifier}\n"
-                    + "(\n"
-                    + "${column_definition}\n"
-                    + ")\n"
-                    + "ENGINE = ${engine_type}\n"
-                    + "UNIQUE KEY (${key_columns})\n"
-                    + "COMMENT ${table_comment}\n"
-                    + "${partition_info}\n"
-                    + "DISTRIBUTED BY HASH (${distribution_columns}) BUCKETS 
${distribution_bucket}\n"
-                    + "PROPERTIES (\n"
-                    + "${properties}\n"
-                    + ")\n";
-
     // common option
     Option<String> FENODES =
             Options.key("fenodes")
@@ -75,11 +55,13 @@ public interface DorisOptions {
                     .defaultValue(9030)
                     .withDescription("doris query port");
 
+    @Deprecated
     Option<String> TABLE_IDENTIFIER =
             Options.key("table.identifier")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("the doris table name.");
+
     Option<String> USERNAME =
             Options.key("username")
                     .stringType()
@@ -90,6 +72,17 @@ public interface DorisOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("the doris password.");
+
+    Option<String> TABLE =
+            Options.key("table")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the doris table name.");
+    Option<String> DATABASE =
+            Options.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the doris database name.");
     Option<Integer> DORIS_BATCH_SIZE =
             Options.key("doris.batch.size")
                     .intType()
@@ -197,6 +190,28 @@ public interface DorisOptions {
                     .defaultValue("information_schema")
                     .withDescription("");
 
+    Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+            Options.key("schema_save_mode")
+                    .enumType(SchemaSaveMode.class)
+                    .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
+                    .withDescription("schema_save_mode");
+
+    Option<DataSaveMode> DATA_SAVE_MODE =
+            Options.key("data_save_mode")
+                    .enumType(DataSaveMode.class)
+                    .defaultValue(DataSaveMode.APPEND_DATA)
+                    .withDescription("data_save_mode");
+
+    Option<String> CUSTOM_SQL =
+            
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
+
+    Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING =
+            Options.key("needs_unsupported_type_casting")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to enable the unsupported type casting, 
such as Decimal64 to Double");
+
     // create table
     Option<String> SAVE_MODE_CREATE_TEMPLATE =
             Options.key("save_mode_create_template")
@@ -205,17 +220,38 @@ public interface DorisOptions {
                             "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (\n"
                                     + "${rowtype_fields}\n"
                                     + ") ENGINE=OLAP\n"
-                                    + "UNIQUE KEY (${rowtype_primary_key})\n"
-                                    + "DISTRIBUTED BY HASH 
(${rowtype_primary_key})\n"
+                                    + " UNIQUE KEY (${rowtype_primary_key})\n"
+                                    + "DISTRIBUTED BY HASH 
(${rowtype_primary_key})\n "
                                     + "PROPERTIES (\n"
-                                    + "    \"replication_num\" = \"1\" \n"
+                                    + "\"replication_allocation\" = 
\"tag.location.default: 1\",\n"
+                                    + "\"in_memory\" = \"false\",\n"
+                                    + "\"storage_format\" = \"V2\",\n"
+                                    + "\"disable_auto_compaction\" = 
\"false\"\n"
                                     + ")")
                     .withDescription("Create table statement template, used to 
create Doris table");
 
     OptionRule.Builder SINK_RULE =
             OptionRule.builder()
-                    .required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER)
-                    .optional(DORIS_BATCH_SIZE);
+                    .required(
+                            FENODES,
+                            USERNAME,
+                            PASSWORD,
+                            SINK_LABEL_PREFIX,
+                            DORIS_SINK_CONFIG_PREFIX,
+                            DATA_SAVE_MODE,
+                            SCHEMA_SAVE_MODE)
+                    .optional(
+                            DATABASE,
+                            TABLE,
+                            TABLE_IDENTIFIER,
+                            QUERY_PORT,
+                            DORIS_BATCH_SIZE,
+                            SINK_ENABLE_2PC,
+                            SINK_ENABLE_DELETE,
+                            MULTI_TABLE_SINK_REPLICA,
+                            SAVE_MODE_CREATE_TEMPLATE,
+                            NEEDS_UNSUPPORTED_TYPE_CASTING)
+                    .conditional(DATA_SAVE_MODE, 
DataSaveMode.CUSTOM_PROCESSING, CUSTOM_SQL);
 
     OptionRule.Builder CATALOG_RULE =
             OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, 
PASSWORD);
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java
deleted file mode 100644
index 884cd7bde4..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java
+++ /dev/null
@@ -1,147 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.seatunnel.connectors.doris.rest;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-
-/** Doris partition info. */
-public class PartitionDefinition implements Serializable, 
Comparable<PartitionDefinition> {
-    private final String database;
-    private final String table;
-
-    private final String beAddress;
-    private final Set<Long> tabletIds;
-    private final String queryPlan;
-
-    public PartitionDefinition(
-            String database, String table, String beAddress, Set<Long> 
tabletIds, String queryPlan)
-            throws IllegalArgumentException {
-        this.database = database;
-        this.table = table;
-        this.beAddress = beAddress;
-        this.tabletIds = tabletIds;
-        this.queryPlan = queryPlan;
-    }
-
-    public String getBeAddress() {
-        return beAddress;
-    }
-
-    public Set<Long> getTabletIds() {
-        return tabletIds;
-    }
-
-    public String getDatabase() {
-        return database;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public String getQueryPlan() {
-        return queryPlan;
-    }
-
-    @Override
-    public int compareTo(PartitionDefinition o) {
-        int cmp = database.compareTo(o.database);
-        if (cmp != 0) {
-            return cmp;
-        }
-        cmp = table.compareTo(o.table);
-        if (cmp != 0) {
-            return cmp;
-        }
-        cmp = beAddress.compareTo(o.beAddress);
-        if (cmp != 0) {
-            return cmp;
-        }
-        cmp = queryPlan.compareTo(o.queryPlan);
-        if (cmp != 0) {
-            return cmp;
-        }
-
-        cmp = tabletIds.size() - o.tabletIds.size();
-        if (cmp != 0) {
-            return cmp;
-        }
-
-        Set<Long> similar = new HashSet<>(tabletIds);
-        Set<Long> diffSelf = new HashSet<>(tabletIds);
-        Set<Long> diffOther = new HashSet<>(o.tabletIds);
-        similar.retainAll(o.tabletIds);
-        diffSelf.removeAll(similar);
-        diffOther.removeAll(similar);
-        if (diffSelf.size() == 0) {
-            return 0;
-        }
-        long diff = Collections.min(diffSelf) - Collections.min(diffOther);
-        return diff < 0 ? -1 : 1;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        PartitionDefinition that = (PartitionDefinition) o;
-        return Objects.equals(database, that.database)
-                && Objects.equals(table, that.table)
-                && Objects.equals(beAddress, that.beAddress)
-                && Objects.equals(tabletIds, that.tabletIds)
-                && Objects.equals(queryPlan, that.queryPlan);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = database.hashCode();
-        result = 31 * result + table.hashCode();
-        result = 31 * result + beAddress.hashCode();
-        result = 31 * result + queryPlan.hashCode();
-        result = 31 * result + tabletIds.hashCode();
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "PartitionDefinition{"
-                + ", database='"
-                + database
-                + '\''
-                + ", table='"
-                + table
-                + '\''
-                + ", beAddress='"
-                + beAddress
-                + '\''
-                + ", tabletIds="
-                + tabletIds
-                + ", queryPlan='"
-                + queryPlan
-                + '\''
-                + '}';
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
index e1c83a0700..e8b66d4497 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
@@ -21,12 +21,7 @@ import 
org.apache.seatunnel.connectors.doris.config.DorisConfig;
 import org.apache.seatunnel.connectors.doris.config.DorisOptions;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.rest.models.Backend;
-import org.apache.seatunnel.connectors.doris.rest.models.BackendRow;
 import org.apache.seatunnel.connectors.doris.rest.models.BackendV2;
-import org.apache.seatunnel.connectors.doris.rest.models.QueryPlan;
-import org.apache.seatunnel.connectors.doris.rest.models.Schema;
-import org.apache.seatunnel.connectors.doris.rest.models.Tablet;
 import org.apache.seatunnel.connectors.doris.util.ErrorMessages;
 
 import org.apache.commons.io.IOUtils;
@@ -36,7 +31,6 @@ import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.entity.StringEntity;
 
 import org.slf4j.Logger;
 
@@ -55,30 +49,16 @@ import java.io.Serializable;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 @Slf4j
 public class RestService implements Serializable {
-    public static final int REST_RESPONSE_STATUS_OK = 200;
-    public static final int REST_RESPONSE_CODE_OK = 0;
-    private static final String REST_RESPONSE_BE_ROWS_KEY = "rows";
-    private static final String API_PREFIX = "/api";
-    private static final String SCHEMA = "_schema";
-    private static final String QUERY_PLAN = "_query_plan";
-    private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
-    @Deprecated private static final String BACKENDS = 
"/rest/v1/system?path=//backends";
     private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
-    private static final String FE_LOGIN = "/rest/v1/login";
-    private static final String BASE_URL = "http://%s%s";;
 
     private static String send(DorisConfig dorisConfig, HttpRequestBase 
request, Logger logger)
             throws DorisConnectorException {
@@ -133,7 +113,7 @@ public class RestService implements Serializable {
                                     dorisConfig.getPassword(),
                                     logger);
                 }
-                if (response == null) {
+                if (StringUtils.isEmpty(response)) {
                     logger.warn(
                             "Failed to get response from Doris FE {}, http 
code is {}",
                             request.getURI(),
@@ -233,43 +213,6 @@ public class RestService implements Serializable {
         return result.toString();
     }
 
-    @VisibleForTesting
-    static String[] parseIdentifier(String tableIdentifier, Logger logger)
-            throws DorisConnectorException {
-        logger.trace("Parse identifier '{}'.", tableIdentifier);
-        if (StringUtils.isEmpty(tableIdentifier)) {
-            String errMsg =
-                    String.format(
-                            ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE,
-                            "table.identifier",
-                            tableIdentifier);
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-        }
-        String[] identifier = tableIdentifier.split("\\.");
-        if (identifier.length != 2) {
-            String errMsg =
-                    String.format(
-                            ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE,
-                            "table.identifier",
-                            tableIdentifier);
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-        }
-        return identifier;
-    }
-
-    @VisibleForTesting
-    static String randomEndpoint(String feNodes, Logger logger) throws 
DorisConnectorException {
-        logger.trace("Parse fenodes '{}'.", feNodes);
-        if (StringUtils.isEmpty(feNodes)) {
-            String errMsg =
-                    String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, 
"fenodes", feNodes);
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-        }
-        List<String> nodes = Arrays.asList(feNodes.split(","));
-        Collections.shuffle(nodes);
-        return nodes.get(0).trim();
-    }
-
     @VisibleForTesting
     static List<String> allEndpoints(String feNodes, Logger logger) throws 
DorisConnectorException {
         logger.trace("Parse fenodes '{}'.", feNodes);
@@ -286,7 +229,7 @@ public class RestService implements Serializable {
 
     @VisibleForTesting
     public static String randomBackend(DorisConfig dorisConfig, Logger logger)
-            throws DorisConnectorException, IOException {
+            throws DorisConnectorException {
         List<BackendV2.BackendRowV2> backends = getBackendsV2(dorisConfig, 
logger);
         logger.trace("Parse beNodes '{}'.", backends);
         if (backends == null || backends.isEmpty()) {
@@ -311,59 +254,9 @@ public class RestService implements Serializable {
         }
     }
 
-    @Deprecated
-    @VisibleForTesting
-    static List<BackendRow> getBackends(DorisConfig dorisConfig, Logger logger)
-            throws DorisConnectorException, IOException {
-        String feNodes = dorisConfig.getFrontends();
-        String feNode = randomEndpoint(feNodes, logger);
-        String beUrl = String.format(BASE_URL, feNode, BACKENDS);
-        HttpGet httpGet = new HttpGet(beUrl);
-        String response = send(dorisConfig, httpGet, logger);
-        logger.info("Backend Info:{}", response);
-        List<BackendRow> backends = parseBackend(response, logger);
-        return backends;
-    }
-
-    @Deprecated
-    static List<BackendRow> parseBackend(String response, Logger logger)
-            throws DorisConnectorException, IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        Backend backend;
-        try {
-            backend = mapper.readValue(response, Backend.class);
-        } catch (JsonParseException e) {
-            String errMsg = "Doris BE's response is not a json. res: " + 
response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        } catch (JsonMappingException e) {
-            String errMsg = "Doris BE's response cannot map to schema. res: " 
+ response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        } catch (IOException e) {
-            String errMsg = "Parse Doris BE's response to json failed. res: " 
+ response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        }
-
-        if (backend == null) {
-            logger.error(ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED,
-                    ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
-        }
-        List<BackendRow> backendRows =
-                backend.getRows().stream().filter(v -> 
v.getAlive()).collect(Collectors.toList());
-        logger.debug("Parsing schema result is '{}'.", backendRows);
-        return backendRows;
-    }
-
     @VisibleForTesting
     public static List<BackendV2.BackendRowV2> getBackendsV2(DorisConfig 
dorisConfig, Logger logger)
-            throws DorisConnectorException, IOException {
+            throws DorisConnectorException {
         String feNodes = dorisConfig.getFrontends();
         List<String> feNodeList = allEndpoints(feNodes, logger);
         for (String feNode : feNodeList) {
@@ -385,7 +278,7 @@ public class RestService implements Serializable {
     }
 
     static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger 
logger)
-            throws DorisConnectorException, IOException {
+            throws DorisConnectorException {
         ObjectMapper mapper = new ObjectMapper();
         BackendV2 backend;
         try {
@@ -416,262 +309,4 @@ public class RestService implements Serializable {
         logger.debug("Parsing schema result is '{}'.", backendRows);
         return backendRows;
     }
-
-    @VisibleForTesting
-    static String getUriStr(DorisConfig dorisConfig, Logger logger) throws 
DorisConnectorException {
-        String[] identifier = 
parseIdentifier(dorisConfig.getTableIdentifier(), logger);
-        return "http://";
-                + randomEndpoint(dorisConfig.getFrontends(), logger)
-                + API_PREFIX
-                + "/"
-                + identifier[0]
-                + "/"
-                + identifier[1]
-                + "/";
-    }
-
-    public static Schema getSchema(DorisConfig dorisConfig, Logger logger)
-            throws DorisConnectorException {
-        logger.trace("Finding schema.");
-        HttpGet httpGet = new HttpGet(getUriStr(dorisConfig, logger) + SCHEMA);
-        String response = send(dorisConfig, httpGet, logger);
-        logger.debug("Find schema response is '{}'.", response);
-        return parseSchema(response, logger);
-    }
-
-    public static boolean isUniqueKeyType(DorisConfig dorisConfig, Logger 
logger)
-            throws DorisConnectorException {
-        try {
-            return UNIQUE_KEYS_TYPE.equals(getSchema(dorisConfig, 
logger).getKeysType());
-        } catch (Exception e) {
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, e);
-        }
-    }
-
-    @VisibleForTesting
-    public static Schema parseSchema(String response, Logger logger)
-            throws DorisConnectorException {
-        logger.trace("Parse response '{}' to schema.", response);
-        ObjectMapper mapper = new ObjectMapper();
-        Schema schema;
-        try {
-            schema = mapper.readValue(response, Schema.class);
-        } catch (JsonParseException e) {
-            String errMsg = "Doris FE's response is not a json. res: " + 
response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        } catch (JsonMappingException e) {
-            String errMsg = "Doris FE's response cannot map to schema. res: " 
+ response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        } catch (IOException e) {
-            String errMsg = "Parse Doris FE's response to json failed. res: " 
+ response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        }
-
-        if (schema == null) {
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED,
-                    ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
-        }
-
-        if (schema.getStatus() != REST_RESPONSE_STATUS_OK) {
-            String errMsg = "Doris FE's response is not OK, status is " + 
schema.getStatus();
-            logger.error(errMsg);
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-        }
-        logger.debug("Parsing schema result is '{}'.", schema);
-        return schema;
-    }
-
-    public static List<PartitionDefinition> findPartitions(DorisConfig 
dorisConfig, Logger logger)
-            throws DorisConnectorException {
-        String[] tableIdentifiers = 
parseIdentifier(dorisConfig.getTableIdentifier(), logger);
-        String readFields =
-                StringUtils.isBlank(dorisConfig.getReadField()) ? "*" : 
dorisConfig.getReadField();
-        String sql =
-                "select "
-                        + readFields
-                        + " from `"
-                        + tableIdentifiers[0]
-                        + "`.`"
-                        + tableIdentifiers[1]
-                        + "`";
-        if (!StringUtils.isEmpty(dorisConfig.getFilterQuery())) {
-            sql += " where " + dorisConfig.getFilterQuery();
-        }
-        logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
-
-        HttpPost httpPost = new HttpPost(getUriStr(dorisConfig, logger) + 
QUERY_PLAN);
-        String entity = "{\"sql\": \"" + sql + "\"}";
-        logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
-        StringEntity stringEntity = new StringEntity(entity, 
StandardCharsets.UTF_8);
-        stringEntity.setContentEncoding("UTF-8");
-        stringEntity.setContentType("application/json");
-        httpPost.setEntity(stringEntity);
-
-        String resStr = send(dorisConfig, httpPost, logger);
-        logger.debug("Find partition response is '{}'.", resStr);
-        QueryPlan queryPlan = getQueryPlan(resStr, logger);
-        Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, 
logger);
-        return tabletsMapToPartition(
-                dorisConfig,
-                be2Tablets,
-                queryPlan.getOpaquedQueryPlan(),
-                tableIdentifiers[0],
-                tableIdentifiers[1],
-                logger);
-    }
-
-    @VisibleForTesting
-    static QueryPlan getQueryPlan(String response, Logger logger) throws 
DorisConnectorException {
-        ObjectMapper mapper = new ObjectMapper();
-        QueryPlan queryPlan;
-        try {
-            queryPlan = mapper.readValue(response, QueryPlan.class);
-        } catch (JsonParseException e) {
-            String errMsg = "Doris FE's response is not a json. res: " + 
response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        } catch (JsonMappingException e) {
-            String errMsg = "Doris FE's response cannot map to schema. res: " 
+ response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        } catch (IOException e) {
-            String errMsg = "Parse Doris FE's response to json failed. res: " 
+ response;
-            logger.error(errMsg, e);
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
-        }
-
-        if (queryPlan == null) {
-            throw new DorisConnectorException(
-                    DorisConnectorErrorCode.REST_SERVICE_FAILED,
-                    ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
-        }
-
-        if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) {
-            String errMsg = "Doris FE's response is not OK, status is " + 
queryPlan.getStatus();
-            logger.error(errMsg);
-            throw new 
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-        }
-        logger.debug("Parsing partition result is '{}'.", queryPlan);
-        return queryPlan;
-    }
-
-    @VisibleForTesting
-    static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, 
Logger logger)
-            throws DorisConnectorException {
-        Map<String, List<Long>> be2Tablets = new HashMap<>();
-        for (Map.Entry<String, Tablet> part : 
queryPlan.getPartitions().entrySet()) {
-            logger.debug("Parse tablet info: '{}'.", part);
-            long tabletId;
-            try {
-                tabletId = Long.parseLong(part.getKey());
-            } catch (NumberFormatException e) {
-                String errMsg = "Parse tablet id '" + part.getKey() + "' to 
long failed.";
-                logger.error(errMsg, e);
-                throw new DorisConnectorException(
-                        DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, 
e);
-            }
-            String target = null;
-            int tabletCount = Integer.MAX_VALUE;
-            for (String candidate : part.getValue().getRoutings()) {
-                logger.trace("Evaluate Doris BE '{}' to tablet '{}'.", 
candidate, tabletId);
-                if (!be2Tablets.containsKey(candidate)) {
-                    logger.debug(
-                            "Choice a new Doris BE '{}' for tablet '{}'.", 
candidate, tabletId);
-                    List<Long> tablets = new ArrayList<>();
-                    be2Tablets.put(candidate, tablets);
-                    target = candidate;
-                    break;
-                } else {
-                    if (be2Tablets.get(candidate).size() < tabletCount) {
-                        target = candidate;
-                        tabletCount = be2Tablets.get(candidate).size();
-                        logger.debug(
-                                "Current candidate Doris BE to tablet '{}' is 
'{}' with tablet count {}.",
-                                tabletId,
-                                target,
-                                tabletCount);
-                    }
-                }
-            }
-            if (target == null) {
-                String errMsg = "Cannot choice Doris BE for tablet " + 
tabletId;
-                logger.error(errMsg);
-                throw new DorisConnectorException(
-                        DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
-            }
-
-            logger.debug("Choice Doris BE '{}' for tablet '{}'.", target, 
tabletId);
-            be2Tablets.get(target).add(tabletId);
-        }
-        return be2Tablets;
-    }
-
-    @VisibleForTesting
-    static int tabletCountLimitForOnePartition(DorisConfig dorisConfig, Logger 
logger) {
-        int tabletsSize = DorisOptions.DORIS_TABLET_SIZE_DEFAULT;
-        if (dorisConfig.getTabletSize() != null) {
-            tabletsSize = dorisConfig.getTabletSize();
-        }
-        if (tabletsSize < DorisOptions.DORIS_TABLET_SIZE_MIN) {
-            logger.warn(
-                    "{} is less than {}, set to default value {}.",
-                    DorisOptions.DORIS_TABLET_SIZE,
-                    DorisOptions.DORIS_TABLET_SIZE_MIN,
-                    DorisOptions.DORIS_TABLET_SIZE_MIN);
-            tabletsSize = DorisOptions.DORIS_TABLET_SIZE_MIN;
-        }
-        logger.debug("Tablet size is set to {}.", tabletsSize);
-        return tabletsSize;
-    }
-
-    @VisibleForTesting
-    static List<PartitionDefinition> tabletsMapToPartition(
-            DorisConfig dorisConfig,
-            Map<String, List<Long>> be2Tablets,
-            String opaquedQueryPlan,
-            String database,
-            String table,
-            Logger logger)
-            throws DorisConnectorException {
-        int tabletsSize = tabletCountLimitForOnePartition(dorisConfig, logger);
-        List<PartitionDefinition> partitions = new ArrayList<>();
-        for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
-            logger.debug("Generate partition with beInfo: '{}'.", beInfo);
-            HashSet<Long> tabletSet = new HashSet<>(beInfo.getValue());
-            beInfo.getValue().clear();
-            beInfo.getValue().addAll(tabletSet);
-            int first = 0;
-            while (first < beInfo.getValue().size()) {
-                Set<Long> partitionTablets =
-                        new HashSet<>(
-                                beInfo.getValue()
-                                        .subList(
-                                                first,
-                                                Math.min(
-                                                        
beInfo.getValue().size(),
-                                                        first + tabletsSize)));
-                first = first + tabletsSize;
-                PartitionDefinition partitionDefinition =
-                        new PartitionDefinition(
-                                database,
-                                table,
-                                beInfo.getKey(),
-                                partitionTablets,
-                                opaquedQueryPlan);
-                logger.debug("Generate one PartitionDefinition '{}'.", 
partitionDefinition);
-                partitions.add(partitionDefinition);
-            }
-        }
-        return partitions;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java
deleted file mode 100644
index f151a0e727..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/** Be response model */
-@Deprecated
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Backend {
-
-    @JsonProperty(value = "rows")
-    private List<BackendRow> rows;
-
-    public List<BackendRow> getRows() {
-        return rows;
-    }
-
-    public void setRows(List<BackendRow> rows) {
-        this.rows = rows;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java
deleted file mode 100644
index fe2260bea7..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
-@Getter
-@Setter
-@ToString
-@Deprecated
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class BackendRow {
-
-    @JsonProperty(value = "HttpPort")
-    private String httpPort;
-
-    @JsonProperty(value = "IP")
-    private String ip;
-
-    @JsonProperty(value = "Alive")
-    private Boolean alive;
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java
deleted file mode 100644
index 8c9d00d01a..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import java.util.Objects;
-
-public class Field {
-    private String name;
-    private String type;
-    private String comment;
-    private int precision;
-    private int scale;
-    private String aggregationType;
-
-    public Field() {}
-
-    public Field(
-            String name,
-            String type,
-            String comment,
-            int precision,
-            int scale,
-            String aggregationType) {
-        this.name = name;
-        this.type = type;
-        this.comment = comment;
-        this.precision = precision;
-        this.scale = scale;
-        this.aggregationType = aggregationType;
-    }
-
-    public String getAggregationType() {
-        return aggregationType;
-    }
-
-    public void setAggregationType(String aggregationType) {
-        this.aggregationType = aggregationType;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public String getComment() {
-        return comment;
-    }
-
-    public void setComment(String comment) {
-        this.comment = comment;
-    }
-
-    public int getPrecision() {
-        return precision;
-    }
-
-    public void setPrecision(int precision) {
-        this.precision = precision;
-    }
-
-    public int getScale() {
-        return scale;
-    }
-
-    public void setScale(int scale) {
-        this.scale = scale;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        Field field = (Field) o;
-        return precision == field.precision
-                && scale == field.scale
-                && Objects.equals(name, field.name)
-                && Objects.equals(type, field.type)
-                && Objects.equals(comment, field.comment);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(name, type, comment, precision, scale);
-    }
-
-    @Override
-    public String toString() {
-        return "Field{"
-                + "name='"
-                + name
-                + '\''
-                + ", type='"
-                + type
-                + '\''
-                + ", comment='"
-                + comment
-                + '\''
-                + ", precision="
-                + precision
-                + ", scale="
-                + scale
-                + '}';
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java
deleted file mode 100644
index d59c6124cd..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import java.util.Map;
-import java.util.Objects;
-
-public class QueryPlan {
-    private int status;
-    private String opaquedQueryPlan;
-    private Map<String, Tablet> partitions;
-
-    public int getStatus() {
-        return status;
-    }
-
-    public void setStatus(int status) {
-        this.status = status;
-    }
-
-    public String getOpaquedQueryPlan() {
-        return opaquedQueryPlan;
-    }
-
-    public void setOpaquedQueryPlan(String opaquedQueryPlan) {
-        this.opaquedQueryPlan = opaquedQueryPlan;
-    }
-
-    public Map<String, Tablet> getPartitions() {
-        return partitions;
-    }
-
-    public void setPartitions(Map<String, Tablet> partitions) {
-        this.partitions = partitions;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        QueryPlan queryPlan = (QueryPlan) o;
-        return status == queryPlan.status
-                && Objects.equals(opaquedQueryPlan, queryPlan.opaquedQueryPlan)
-                && Objects.equals(partitions, queryPlan.partitions);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(status, opaquedQueryPlan, partitions);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java
deleted file mode 100644
index 60e06bba97..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-public class Schema {
-    private int status = 0;
-    private String keysType;
-    private List<Field> properties;
-
-    public Schema() {
-        properties = new ArrayList<>();
-    }
-
-    public Schema(int fieldCount) {
-        properties = new ArrayList<>(fieldCount);
-    }
-
-    public int getStatus() {
-        return status;
-    }
-
-    public void setStatus(int status) {
-        this.status = status;
-    }
-
-    public String getKeysType() {
-        return keysType;
-    }
-
-    public void setKeysType(String keysType) {
-        this.keysType = keysType;
-    }
-
-    public List<Field> getProperties() {
-        return properties;
-    }
-
-    public void setProperties(List<Field> properties) {
-        this.properties = properties;
-    }
-
-    public void put(
-            String name,
-            String type,
-            String comment,
-            int scale,
-            int precision,
-            String aggregationType) {
-        properties.add(new Field(name, type, comment, scale, precision, 
aggregationType));
-    }
-
-    public void put(Field f) {
-        properties.add(f);
-    }
-
-    public Field get(int index) {
-        if (index >= properties.size()) {
-            throw new IndexOutOfBoundsException(
-                    "Index: " + index + ", Fields size:" + properties.size());
-        }
-        return properties.get(index);
-    }
-
-    public int size() {
-        return properties.size();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        Schema schema = (Schema) o;
-        return status == schema.status && Objects.equals(properties, 
schema.properties);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(status, properties);
-    }
-
-    @Override
-    public String toString() {
-        return "Schema{" + "status=" + status + ", properties=" + properties + 
'}';
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java
deleted file mode 100644
index cb52490997..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import java.util.List;
-import java.util.Objects;
-
-public class Tablet {
-    private List<String> routings;
-    private int version;
-    private long versionHash;
-    private long schemaHash;
-
-    public List<String> getRoutings() {
-        return routings;
-    }
-
-    public void setRoutings(List<String> routings) {
-        this.routings = routings;
-    }
-
-    public int getVersion() {
-        return version;
-    }
-
-    public void setVersion(int version) {
-        this.version = version;
-    }
-
-    public long getVersionHash() {
-        return versionHash;
-    }
-
-    public void setVersionHash(long versionHash) {
-        this.versionHash = versionHash;
-    }
-
-    public long getSchemaHash() {
-        return schemaHash;
-    }
-
-    public void setSchemaHash(long schemaHash) {
-        this.schemaHash = schemaHash;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        Tablet tablet = (Tablet) o;
-        return version == tablet.version
-                && versionHash == tablet.versionHash
-                && schemaHash == tablet.schemaHash
-                && Objects.equals(routings, tablet.routings);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(routings, version, versionHash, schemaHash);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 8a45e34cef..e0ef8e1893 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -18,17 +18,24 @@
 package org.apache.seatunnel.connectors.doris.sink;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
 import org.apache.seatunnel.api.sink.SaveModeHandler;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.sink.SupportSaveMode;
+import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.doris.config.DorisConfig;
+import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
 import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
 import 
org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfoSerializer;
 import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitter;
@@ -41,17 +48,22 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
+import static 
org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
+
 public class DorisSink
         implements SeaTunnelSink<SeaTunnelRow, DorisSinkState, 
DorisCommitInfo, DorisCommitInfo>,
-                SupportSaveMode {
+                SupportSaveMode,
+                SupportMultiTableSink {
 
     private final DorisConfig dorisConfig;
-    private final SeaTunnelRowType seaTunnelRowType;
+    private final ReadonlyConfig config;
+    private final CatalogTable catalogTable;
     private String jobId;
 
     public DorisSink(ReadonlyConfig config, CatalogTable catalogTable) {
+        this.config = config;
+        this.catalogTable = catalogTable;
         this.dorisConfig = DorisConfig.of(config);
-        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
     }
 
     @Override
@@ -68,13 +80,13 @@ public class DorisSink
     public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
         return new DorisSinkWriter(
-                context, Collections.emptyList(), seaTunnelRowType, 
dorisConfig, jobId);
+                context, Collections.emptyList(), catalogTable, dorisConfig, 
jobId);
     }
 
     @Override
     public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> 
restoreWriter(
             SinkWriter.Context context, List<DorisSinkState> states) throws 
IOException {
-        return new DorisSinkWriter(context, states, seaTunnelRowType, 
dorisConfig, jobId);
+        return new DorisSinkWriter(context, states, catalogTable, dorisConfig, 
jobId);
     }
 
     @Override
@@ -94,6 +106,27 @@ public class DorisSink
 
     @Override
     public Optional<SaveModeHandler> getSaveModeHandler() {
-        return Optional.empty();
+        CatalogFactory catalogFactory =
+                discoverFactory(
+                        Thread.currentThread().getContextClassLoader(),
+                        CatalogFactory.class,
+                        "Doris");
+        if (catalogFactory == null) {
+            throw new DorisConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            getPluginName(), PluginType.SINK, "Cannot find 
Doris catalog factory"));
+        }
+
+        Catalog catalog = 
catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
+        catalog.open();
+        return Optional.of(
+                new DefaultSaveModeHandler(
+                        config.get(DorisOptions.SCHEMA_SAVE_MODE),
+                        config.get(DorisOptions.DATA_SAVE_MODE),
+                        catalog,
+                        catalogTable,
+                        config.get(DorisOptions.CUSTOM_SQL)));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
index 6c001c6244..35b0b8da5e 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.doris.sink;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -28,9 +29,20 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.doris.config.DorisOptions;
 import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
 import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
+import 
org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils;
+
+import org.apache.commons.lang3.StringUtils;
 
 import com.google.auto.service.AutoService;
 
+import static 
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY;
+import static 
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY;
+import static 
org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
+import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER;
+
 @AutoService(Factory.class)
 public class DorisSinkFactory
         implements TableSinkFactory<
@@ -49,7 +61,50 @@ public class DorisSinkFactory
     public TableSink<SeaTunnelRow, DorisSinkState, DorisCommitInfo, 
DorisCommitInfo> createSink(
             TableSinkFactoryContext context) {
         ReadonlyConfig config = context.getOptions();
-        CatalogTable catalogTable = context.getCatalogTable();
-        return () -> new DorisSink(config, catalogTable);
+        CatalogTable catalogTable =
+                config.get(NEEDS_UNSUPPORTED_TYPE_CASTING)
+                        ? UnsupportedTypeConverterUtils.convertCatalogTable(
+                                context.getCatalogTable())
+                        : context.getCatalogTable();
+        final CatalogTable finalCatalogTable = this.renameCatalogTable(config, 
catalogTable);
+        return () -> new DorisSink(config, finalCatalogTable);
+    }
+
+    private CatalogTable renameCatalogTable(ReadonlyConfig options, 
CatalogTable catalogTable) {
+        TableIdentifier tableId = catalogTable.getTableId();
+        String tableName;
+        String databaseName;
+        String tableIdentifier = options.get(TABLE_IDENTIFIER);
+        if (StringUtils.isNotEmpty(tableIdentifier)) {
+            tableName = tableIdentifier.split("\\.")[1];
+            databaseName = tableIdentifier.split("\\.")[0];
+        } else {
+            if (StringUtils.isNotEmpty(options.get(TABLE))) {
+                tableName = replaceName(options.get(TABLE), tableId);
+            } else {
+                tableName = tableId.getTableName();
+            }
+            if (StringUtils.isNotEmpty(options.get(DATABASE))) {
+                databaseName = replaceName(options.get(DATABASE), tableId);
+            } else {
+                databaseName = tableId.getDatabaseName();
+            }
+        }
+        TableIdentifier newTableId =
+                TableIdentifier.of(tableId.getCatalogName(), databaseName, 
null, tableName);
+        return CatalogTable.of(newTableId, catalogTable);
+    }
+
+    private String replaceName(String original, TableIdentifier tableId) {
+        if (tableId.getTableName() != null) {
+            original = original.replace(REPLACE_TABLE_NAME_KEY, 
tableId.getTableName());
+        }
+        if (tableId.getSchemaName() != null) {
+            original = original.replace(REPLACE_SCHEMA_NAME_KEY, 
tableId.getSchemaName());
+        }
+        if (tableId.getDatabaseName() != null) {
+            original = original.replace(REPLACE_DATABASE_NAME_KEY, 
tableId.getDatabaseName());
+        }
+        return original;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
index 03179f92e5..c3c5725b32 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
@@ -22,11 +22,13 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
+import java.io.Serializable;
+
 @Setter
 @Getter
 @ToString
 @EqualsAndHashCode
-public class DorisSinkState {
+public class DorisSinkState implements Serializable {
     private final String labelPrefix;
     private final long checkpointId;
 
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 2f35357e5b..0abdc6269c 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.connectors.doris.sink.writer;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.doris.config.DorisConfig;
@@ -31,6 +33,7 @@ import 
org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
 import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
 import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
 import org.apache.seatunnel.connectors.doris.util.HttpUtil;
+import 
org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -53,7 +56,9 @@ import java.util.concurrent.TimeUnit;
 import static com.google.common.base.Preconditions.checkState;
 
 @Slf4j
-public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, 
DorisCommitInfo, DorisSinkState> {
+public class DorisSinkWriter
+        implements SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>,
+                SupportMultiTableSinkWriter<Void> {
     private static final int INITIAL_DELAY = 200;
     private static final int CONNECT_TIMEOUT = 1000;
     private static final List<String> DORIS_SUCCESS_STATUS =
@@ -66,6 +71,7 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
     private final LabelGenerator labelGenerator;
     private final int intervalTime;
     private final DorisSerializer serializer;
+    private final CatalogTable catalogTable;
     private final transient ScheduledExecutorService scheduledExecutorService;
     private transient Thread executorThread;
     private transient volatile Exception loadException = null;
@@ -75,21 +81,28 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
     public DorisSinkWriter(
             SinkWriter.Context context,
             List<DorisSinkState> state,
-            SeaTunnelRowType seaTunnelRowType,
+            CatalogTable catalogTable,
             DorisConfig dorisConfig,
             String jobId)
             throws IOException {
         this.dorisConfig = dorisConfig;
+        this.catalogTable = catalogTable;
         this.lastCheckpointId = !state.isEmpty() ? 
state.get(0).getCheckpointId() : 0;
         log.info("restore checkpointId {}", lastCheckpointId);
         log.info("labelPrefix " + dorisConfig.getLabelPrefix());
         this.labelPrefix =
-                dorisConfig.getLabelPrefix() + "_" + jobId + "_" + 
context.getIndexOfSubtask();
+                dorisConfig.getLabelPrefix()
+                        + "_"
+                        + 
catalogTable.getTablePath().getFullName().replaceAll("\\.", "_")
+                        + "_"
+                        + jobId
+                        + "_"
+                        + context.getIndexOfSubtask();
         this.labelGenerator = new LabelGenerator(labelPrefix, 
dorisConfig.getEnable2PC());
         this.scheduledExecutorService =
                 new ScheduledThreadPoolExecutor(
                         1, new 
ThreadFactoryBuilder().setNameFormat("stream-load-check").build());
-        this.serializer = createSerializer(dorisConfig, seaTunnelRowType);
+        this.serializer = createSerializer(dorisConfig, 
catalogTable.getSeaTunnelRowType());
         this.intervalTime = dorisConfig.getCheckInterval();
         this.loading = false;
         this.initializeLoad();
@@ -101,7 +114,11 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
         try {
             this.dorisStreamLoad =
                     new DorisStreamLoad(
-                            backend, dorisConfig, labelGenerator, new 
HttpUtil().getHttpClient());
+                            backend,
+                            catalogTable.getTablePath(),
+                            dorisConfig,
+                            labelGenerator,
+                            new HttpUtil().getHttpClient());
             if (dorisConfig.getEnable2PC()) {
                 dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 
1);
             }
@@ -120,7 +137,11 @@ public class DorisSinkWriter implements 
SinkWriter<SeaTunnelRow, DorisCommitInfo
     @Override
     public void write(SeaTunnelRow element) throws IOException {
         checkLoadException();
-        byte[] serialize = serializer.serialize(element);
+        byte[] serialize =
+                serializer.serialize(
+                        dorisConfig.isNeedsUnsupportedTypeCasting()
+                                ? 
UnsupportedTypeConverterUtils.convertRow(element)
+                                : element);
         if (Objects.isNull(serialize)) {
             return;
         }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index 892c1da954..00f21e9ae5 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.doris.sink.writer;
 
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.connectors.doris.config.DorisConfig;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
@@ -82,13 +83,13 @@ public class DorisStreamLoad implements Serializable {
 
     public DorisStreamLoad(
             String hostPort,
+            TablePath tablePath,
             DorisConfig dorisConfig,
             LabelGenerator labelGenerator,
             CloseableHttpClient httpClient) {
         this.hostPort = hostPort;
-        String[] tableInfo = dorisConfig.getTableIdentifier().split("\\.");
-        this.db = tableInfo[0];
-        this.table = tableInfo[1];
+        this.db = tablePath.getDatabaseName();
+        this.table = tablePath.getTableName();
         this.user = dorisConfig.getUsername();
         this.passwd = dorisConfig.getPassword();
         this.labelGenerator = labelGenerator;
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
index fc1884efcf..3f890af855 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
@@ -34,11 +35,14 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 public class DorisCatalogUtil {
 
     public static final String ALL_DATABASES_QUERY =
@@ -109,10 +113,22 @@ public class DorisCatalogUtil {
                             .map(r -> "`" + r + "`")
                             .collect(Collectors.joining(","));
         }
+        String uniqueKey = "";
+        if (!tableSchema.getConstraintKeys().isEmpty()) {
+            uniqueKey =
+                    tableSchema.getConstraintKeys().stream()
+                            .flatMap(c -> c.getColumnNames().stream())
+                            .map(r -> "`" + r.getColumnName() + "`")
+                            .collect(Collectors.joining(","));
+        }
         template =
                 template.replaceAll(
                         String.format("\\$\\{%s\\}", 
SaveModeConstants.ROWTYPE_PRIMARY_KEY),
                         primaryKey);
+        template =
+                template.replaceAll(
+                        String.format("\\$\\{%s\\}", 
SaveModeConstants.ROWTYPE_UNIQUE_KEY),
+                        uniqueKey);
         Map<String, CreateTableParser.ColumnInfo> columnInTemplate =
                 CreateTableParser.getColumnList(template);
         template = mergeColumnInTemplate(columnInTemplate, tableSchema, 
template);
@@ -120,15 +136,7 @@ public class DorisCatalogUtil {
         String rowTypeFields =
                 tableSchema.getColumns().stream()
                         .filter(column -> 
!columnInTemplate.containsKey(column.getName()))
-                        .map(
-                                column ->
-                                        String.format(
-                                                "`%s` %s %s ",
-                                                column.getName(),
-                                                fromSeaTunnelType(
-                                                        column.getDataType(),
-                                                        
column.getColumnLength()),
-                                                column.isNullable() ? "NULL" : 
"NOT NULL"))
+                        .map(DorisCatalogUtil::columnToDorisType)
                         .collect(Collectors.joining(",\n"));
         return template.replaceAll(
                         String.format("\\$\\{%s\\}", 
SaveModeConstants.DATABASE),
@@ -149,18 +157,18 @@ public class DorisCatalogUtil {
         Map<String, Column> columnMap =
                 tableSchema.getColumns().stream()
                         .collect(Collectors.toMap(Column::getName, 
Function.identity()));
-        for (String col : columnInTemplate.keySet()) {
-            CreateTableParser.ColumnInfo columnInfo = 
columnInTemplate.get(col);
+        List<CreateTableParser.ColumnInfo> columnInfosInSeq =
+                columnInTemplate.values().stream()
+                        .sorted(
+                                Comparator.comparingInt(
+                                        
CreateTableParser.ColumnInfo::getStartIndex))
+                        .collect(Collectors.toList());
+        for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) {
+            String col = columnInfo.getName();
             if (StringUtils.isEmpty(columnInfo.getInfo())) {
                 if (columnMap.containsKey(col)) {
                     Column column = columnMap.get(col);
-                    String newCol =
-                            String.format(
-                                    "`%s` %s %s ",
-                                    column.getName(),
-                                    fromSeaTunnelType(
-                                            column.getDataType(), 
column.getColumnLength()),
-                                    column.isNullable() ? "NULL" : "NOT NULL");
+                    String newCol = columnToDorisType(column);
                     String prefix = template.substring(0, 
columnInfo.getStartIndex() + offset);
                     String suffix = template.substring(offset + 
columnInfo.getEndIndex());
                     if (prefix.endsWith("`")) {
@@ -181,6 +189,21 @@ public class DorisCatalogUtil {
         return template;
     }
 
+    private static String columnToDorisType(Column column) {
+        checkNotNull(column, "The column is required.");
+        return String.format(
+                "`%s` %s %s ",
+                column.getName(),
+                fromSeaTunnelType(
+                        column.getDataType(),
+                        Math.max(
+                                column.getColumnLength() == null ? 0 : 
column.getColumnLength(),
+                                column.getLongColumnLength() == null
+                                        ? 0
+                                        : column.getLongColumnLength())),
+                column.isNullable() ? "NULL" : "NOT NULL");
+    }
+
     public static SeaTunnelDataType<?> fromDorisType(ResultSet rs) throws 
SQLException {
 
         String type = rs.getString(5).toUpperCase();
@@ -214,6 +237,7 @@ public class DorisCatalogUtil {
                 return LocalTimeType.LOCAL_DATE_TIME_TYPE;
             case "DECIMAL":
             case "DECIMALV2":
+            case "DECIMALV3":
                 int precision = rs.getInt(8);
                 int scale = rs.getInt(9);
                 return new DecimalType(precision, scale);
@@ -233,14 +257,16 @@ public class DorisCatalogUtil {
         }
     }
 
-    private static String fromSeaTunnelType(SeaTunnelDataType<?> dataType, 
Integer columnLength) {
+    private static String fromSeaTunnelType(SeaTunnelDataType<?> dataType, 
Long columnLength) {
 
         switch (dataType.getSqlType()) {
             case STRING:
-                if (columnLength != null && columnLength > 65533) {
-                    return "STRING";
+                if (columnLength != null && columnLength <= 65533 && 
columnLength > 0) {
+                    return String.format("VARCHAR(%d)", columnLength);
                 }
-                return String.format("VARCHAR(%d)", columnLength);
+                return "STRING";
+            case BYTES:
+                return "STRING";
             case NULL:
                 return "NULL_TYPE";
             case BOOLEAN:
@@ -260,13 +286,21 @@ public class DorisCatalogUtil {
                 return String.format(
                         "DECIMALV3(%d,%d)", decimalType.getPrecision(), 
decimalType.getScale());
             case TIME:
-                return "TIME";
+                return "VARCHAR(8)";
             case DATE:
                 return "DATEV2";
             case TIMESTAMP:
                 return "DATETIME";
+            case ARRAY:
+                return "ARRAY<"
+                        + fromSeaTunnelType(
+                                ((ArrayType<?, ?>) dataType).getElementType(), 
Long.MAX_VALUE)
+                        + ">";
+            case MAP:
             case ROW:
                 return "JSONB";
+            case TINYINT:
+                return "TINYINT";
             default:
                 throw new CatalogException(String.format("Unsupported doris 
type: %s", dataType));
         }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java
deleted file mode 100644
index 3e914d7d2a..0000000000
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.util;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.util.Properties;
-
-public class IOUtils {
-    public static String propsToString(Properties props) throws 
IllegalArgumentException {
-        StringWriter sw = new StringWriter();
-        if (props != null) {
-            try {
-                props.store(sw, "");
-            } catch (IOException ex) {
-                throw new IllegalArgumentException("Cannot parse props to 
String.", ex);
-            }
-        }
-        return sw.toString();
-    }
-
-    public static Properties propsFromString(String source) throws 
IllegalArgumentException {
-        Properties copy = new Properties();
-        if (source != null) {
-            try {
-                copy.load(new StringReader(source));
-            } catch (IOException ex) {
-                throw new IllegalArgumentException("Cannot parse props from 
String.", ex);
-            }
-        }
-        return copy;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/UnsupportedTypeConverterUtils.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/UnsupportedTypeConverterUtils.java
new file mode 100644
index 0000000000..bcf0ed534f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/UnsupportedTypeConverterUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.util;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
+
+public class UnsupportedTypeConverterUtils {
+    public static Object convertBigDecimal(BigDecimal bigDecimal) {
+        if (bigDecimal.precision() > 38) {
+            return bigDecimal.doubleValue();
+        }
+        return bigDecimal;
+    }
+
+    public static SeaTunnelRow convertRow(SeaTunnelRow row) {
+        List<Object> newValues =
+                Arrays.stream(row.getFields())
+                        .map(
+                                value -> {
+                                    if (value instanceof BigDecimal) {
+                                        return convertBigDecimal((BigDecimal) 
value);
+                                    }
+                                    return value;
+                                })
+                        .collect(Collectors.toList());
+        return new SeaTunnelRow(newValues.toArray());
+    }
+
+    public static CatalogTable convertCatalogTable(CatalogTable catalogTable) {
+        TableSchema tableSchema = catalogTable.getTableSchema();
+        List<Column> columns = tableSchema.getColumns();
+        List<Column> newColumns =
+                columns.stream()
+                        .map(
+                                column -> {
+                                    if 
(column.getDataType().getSqlType().equals(SqlType.DECIMAL)) {
+                                        DecimalType decimalType =
+                                                (DecimalType) 
column.getDataType();
+                                        if (decimalType.getPrecision() > 38) {
+                                            return PhysicalColumn.of(
+                                                    column.getName(),
+                                                    DOUBLE_TYPE,
+                                                    22,
+                                                    column.isNullable(),
+                                                    null,
+                                                    column.getComment(),
+                                                    "DOUBLE",
+                                                    false,
+                                                    false,
+                                                    0L,
+                                                    column.getOptions(),
+                                                    22L);
+                                        }
+                                    }
+                                    return column;
+                                })
+                        .collect(Collectors.toList());
+        TableSchema newtableSchema =
+                TableSchema.builder()
+                        .columns(newColumns)
+                        .primaryKey(tableSchema.getPrimaryKey())
+                        .constraintKey(tableSchema.getConstraintKeys())
+                        .build();
+
+        return CatalogTable.of(
+                catalogTable.getTableId(),
+                newtableSchema,
+                catalogTable.getOptions(),
+                catalogTable.getPartitionKeys(),
+                catalogTable.getComment(),
+                catalogTable.getCatalogName());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
new file mode 100644
index 0000000000..404351090e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class DorisCreateTableTest {
+
+    @Test
+    public void test() {
+
+        List<Column> columns = new ArrayList<>();
+
+        columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, null, true, 
null, ""));
+        columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, null, 
true, null, ""));
+        columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, null, true, 
null, ""));
+        columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, null, true, 
null, ""));
+        columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, null, 
true, null, ""));
+        columns.add(PhysicalColumn.of("create_time", BasicType.LONG_TYPE, 
null, true, null, ""));
+
+        String result =
+                DorisCatalogUtil.getCreateTableStatement(
+                        "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (                                                 
                                                                                
                  \n"
+                                + "${rowtype_primary_key}  ,       \n"
+                                + "${rowtype_unique_key} , \n"
+                                + "`create_time` DATETIME NOT NULL ,  \n"
+                                + "${rowtype_fields}  \n"
+                                + ") ENGINE=OLAP  \n"
+                                + "PRIMARY 
KEY(${rowtype_primary_key},`create_time`)  \n"
+                                + "PARTITION BY RANGE (`create_time`)(  \n"
+                                + "   PARTITION p20230329 VALUES LESS THAN 
(\"2023-03-29\")                                                                
                                                                                
           \n"
+                                + ")                                      \n"
+                                + "DISTRIBUTED BY HASH 
(${rowtype_primary_key})  \n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_allocation\" = 
\"tag.location.default: 1\",\n"
+                                + "\"in_memory\" = \"false\",\n"
+                                + "\"storage_format\" = \"V2\",\n"
+                                + "\"disable_auto_compaction\" = \"false\"\n"
+                                + ")",
+                        TablePath.of("test1.test2"),
+                        CatalogTable.of(
+                                TableIdentifier.of("test", "test1", "test2"),
+                                TableSchema.builder()
+                                        .primaryKey(PrimaryKey.of("", 
Arrays.asList("id", "age")))
+                                        .constraintKey(
+                                                Arrays.asList(
+                                                        ConstraintKey.of(
+                                                                
ConstraintKey.ConstraintType
+                                                                        
.UNIQUE_KEY,
+                                                                "unique_key",
+                                                                
Collections.singletonList(
+                                                                        
ConstraintKey
+                                                                               
 .ConstraintKeyColumn
+                                                                               
 .of(
+                                                                               
         "name",
+                                                                               
         ConstraintKey
+                                                                               
                 .ColumnSortType
+                                                                               
                 .DESC))),
+                                                        ConstraintKey.of(
+                                                                
ConstraintKey.ConstraintType
+                                                                        
.UNIQUE_KEY,
+                                                                "unique_key2",
+                                                                
Collections.singletonList(
+                                                                        
ConstraintKey
+                                                                               
 .ConstraintKeyColumn
+                                                                               
 .of(
+                                                                               
         "score",
+                                                                               
         ConstraintKey
+                                                                               
                 .ColumnSortType
+                                                                               
                 .ASC)))))
+                                        .columns(columns)
+                                        .build(),
+                                Collections.emptyMap(),
+                                Collections.emptyList(),
+                                ""));
+        Assertions.assertEquals(
+                result,
+                "CREATE TABLE IF NOT EXISTS `test1`.`test2` (                  
                                                                                
                                                 \n"
+                        + "`id` BIGINT(1) NULL ,`age` INT(1) NULL   ,       \n"
+                        + "`name` STRING NULL ,`score` INT(1) NULL  , \n"
+                        + "`create_time` DATETIME NOT NULL ,  \n"
+                        + "`gender` TINYINT NULL   \n"
+                        + ") ENGINE=OLAP  \n"
+                        + "PRIMARY KEY(`id`,`age`,`create_time`)  \n"
+                        + "PARTITION BY RANGE (`create_time`)(  \n"
+                        + "   PARTITION p20230329 VALUES LESS THAN 
(\"2023-03-29\")                                                                
                                                                                
           \n"
+                        + ")                                      \n"
+                        + "DISTRIBUTED BY HASH (`id`,`age`)  \n"
+                        + "PROPERTIES (\n"
+                        + "\"replication_allocation\" = 
\"tag.location.default: 1\",\n"
+                        + "\"in_memory\" = \"false\",\n"
+                        + "\"storage_format\" = \"V2\",\n"
+                        + "\"disable_auto_compaction\" = \"false\"\n"
+                        + ")");
+    }
+
+    @Test
+    public void testInSeq() {
+
+        List<Column> columns = new ArrayList<>();
+
+        columns.add(PhysicalColumn.of("L_ORDERKEY", BasicType.INT_TYPE, null, 
false, null, ""));
+        columns.add(PhysicalColumn.of("L_PARTKEY", BasicType.INT_TYPE, null, 
false, null, ""));
+        columns.add(PhysicalColumn.of("L_SUPPKEY", BasicType.INT_TYPE, null, 
false, null, ""));
+        columns.add(PhysicalColumn.of("L_LINENUMBER", BasicType.INT_TYPE, 
null, false, null, ""));
+        columns.add(PhysicalColumn.of("L_QUANTITY", new DecimalType(15, 2), 
null, false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_EXTENDEDPRICE", new DecimalType(15, 2), null, 
false, null, ""));
+        columns.add(PhysicalColumn.of("L_DISCOUNT", new DecimalType(15, 2), 
null, false, null, ""));
+        columns.add(PhysicalColumn.of("L_TAX", new DecimalType(15, 2), null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of("L_RETURNFLAG", BasicType.STRING_TYPE, null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of("L_LINESTATUS", BasicType.STRING_TYPE, null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_SHIPDATE", LocalTimeType.LOCAL_DATE_TYPE, null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_COMMITDATE", LocalTimeType.LOCAL_DATE_TYPE, null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of(
+                        "L_RECEIPTDATE", LocalTimeType.LOCAL_DATE_TYPE, null, 
false, null, ""));
+        columns.add(
+                PhysicalColumn.of("L_SHIPINSTRUCT", BasicType.STRING_TYPE, 
null, false, null, ""));
+        columns.add(PhysicalColumn.of("L_SHIPMODE", BasicType.STRING_TYPE, 
null, false, null, ""));
+        columns.add(PhysicalColumn.of("L_COMMENT", BasicType.STRING_TYPE, 
null, false, null, ""));
+
+        String result =
+                DorisCatalogUtil.getCreateTableStatement(
+                        "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (\n"
+                                + "`L_COMMITDATE`,\n"
+                                + "${rowtype_primary_key},\n"
+                                + "L_SUPPKEY BIGINT NOT NULL,\n"
+                                + "${rowtype_fields}\n"
+                                + ") ENGINE=OLAP\n"
+                                + " PRIMARY KEY (L_COMMITDATE, 
${rowtype_primary_key}, L_SUPPKEY)\n"
+                                + "DISTRIBUTED BY HASH 
(${rowtype_primary_key})"
+                                + "PROPERTIES (\n"
+                                + "    \"replication_num\" = \"1\" \n"
+                                + ")",
+                        TablePath.of("tpch", "lineitem"),
+                        CatalogTable.of(
+                                TableIdentifier.of("test", "tpch", "lineitem"),
+                                TableSchema.builder()
+                                        .primaryKey(
+                                                PrimaryKey.of(
+                                                        "",
+                                                        Arrays.asList(
+                                                                "L_ORDERKEY", 
"L_LINENUMBER")))
+                                        .columns(columns)
+                                        .build(),
+                                Collections.emptyMap(),
+                                Collections.emptyList(),
+                                ""));
+        String expected =
+                "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
+                        + "`L_COMMITDATE` DATEV2 NOT NULL ,\n"
+                        + "`L_ORDERKEY` INT(1) NOT NULL ,`L_LINENUMBER` INT(1) 
NOT NULL ,\n"
+                        + "L_SUPPKEY BIGINT NOT NULL,\n"
+                        + "`L_PARTKEY` INT(1) NOT NULL ,\n"
+                        + "`L_QUANTITY` DECIMALV3(15,2) NOT NULL ,\n"
+                        + "`L_EXTENDEDPRICE` DECIMALV3(15,2) NOT NULL ,\n"
+                        + "`L_DISCOUNT` DECIMALV3(15,2) NOT NULL ,\n"
+                        + "`L_TAX` DECIMALV3(15,2) NOT NULL ,\n"
+                        + "`L_RETURNFLAG` STRING NOT NULL ,\n"
+                        + "`L_LINESTATUS` STRING NOT NULL ,\n"
+                        + "`L_SHIPDATE` DATEV2 NOT NULL ,\n"
+                        + "`L_RECEIPTDATE` DATEV2 NOT NULL ,\n"
+                        + "`L_SHIPINSTRUCT` STRING NOT NULL ,\n"
+                        + "`L_SHIPMODE` STRING NOT NULL ,\n"
+                        + "`L_COMMENT` STRING NOT NULL \n"
+                        + ") ENGINE=OLAP\n"
+                        + " PRIMARY KEY (L_COMMITDATE, 
`L_ORDERKEY`,`L_LINENUMBER`, L_SUPPKEY)\n"
+                        + "DISTRIBUTED BY HASH 
(`L_ORDERKEY`,`L_LINENUMBER`)PROPERTIES (\n"
+                        + "    \"replication_num\" = \"1\" \n"
+                        + ")";
+        Assertions.assertEquals(result, expected);
+    }
+
+    @Test
+    public void testWithVarchar() {
+
+        List<Column> columns = new ArrayList<>();
+
+        columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, null, true, 
null, ""));
+        columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, null, 
true, null, ""));
+        columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, null, true, 
null, ""));
+        columns.add(PhysicalColumn.of("comment", BasicType.STRING_TYPE, 500, 
true, null, ""));
+        columns.add(PhysicalColumn.of("description", BasicType.STRING_TYPE, 
70000, true, null, ""));
+
+        String result =
+                DorisCatalogUtil.getCreateTableStatement(
+                        "CREATE TABLE IF NOT EXISTS 
`${database}`.`${table_name}` (                                                 
                                                                                
                  \n"
+                                + "${rowtype_primary_key}  ,       \n"
+                                + "`create_time` DATETIME NOT NULL ,  \n"
+                                + "${rowtype_fields}  \n"
+                                + ") ENGINE=OLAP  \n"
+                                + "PRIMARY 
KEY(${rowtype_primary_key},`create_time`)  \n"
+                                + "PARTITION BY RANGE (`create_time`)(  \n"
+                                + "   PARTITION p20230329 VALUES LESS THAN 
(\"2023-03-29\")                                                                
                                                                                
           \n"
+                                + ")                                      \n"
+                                + "DISTRIBUTED BY HASH 
(${rowtype_primary_key})  \n"
+                                + "PROPERTIES (                           \n"
+                                + "    \"dynamic_partition.enable\" = 
\"true\",                                                                       
                                                                                
                \n"
+                                + "    \"dynamic_partition.time_unit\" = 
\"DAY\",                                                                        
                                                                                
             \n"
+                                + "    \"dynamic_partition.end\" = \"3\", \n"
+                                + "    \"dynamic_partition.prefix\" = \"p\"    
                                                                                
                                                                                
       \n"
+                                + ");",
+                        TablePath.of("test1", "test2"),
+                        CatalogTable.of(
+                                TableIdentifier.of("test", "test1", "test2"),
+                                TableSchema.builder()
+                                        .primaryKey(PrimaryKey.of("", 
Arrays.asList("id", "age")))
+                                        .columns(columns)
+                                        .build(),
+                                Collections.emptyMap(),
+                                Collections.emptyList(),
+                                ""));
+
+        Assertions.assertEquals(
+                result,
+                "CREATE TABLE IF NOT EXISTS `test1`.`test2` (                  
                                                                                
                                                 \n"
+                        + "`id` BIGINT(1) NULL ,`age` INT(1) NULL   ,       \n"
+                        + "`create_time` DATETIME NOT NULL ,  \n"
+                        + "`name` STRING NULL ,\n"
+                        + "`comment` VARCHAR(500) NULL ,\n"
+                        + "`description` STRING NULL   \n"
+                        + ") ENGINE=OLAP  \n"
+                        + "PRIMARY KEY(`id`,`age`,`create_time`)  \n"
+                        + "PARTITION BY RANGE (`create_time`)(  \n"
+                        + "   PARTITION p20230329 VALUES LESS THAN 
(\"2023-03-29\")                                                                
                                                                                
           \n"
+                        + ")                                      \n"
+                        + "DISTRIBUTED BY HASH (`id`,`age`)  \n"
+                        + "PROPERTIES (                           \n"
+                        + "    \"dynamic_partition.enable\" = \"true\",        
                                                                                
                                                                               
\n"
+                        + "    \"dynamic_partition.time_unit\" = \"DAY\",      
                                                                                
                                                                               
\n"
+                        + "    \"dynamic_partition.end\" = \"3\", \n"
+                        + "    \"dynamic_partition.prefix\" = \"p\"            
                                                                                
                                                                               
\n"
+                        + ");");
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index 4722b84025..275c714686 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -18,17 +18,21 @@
 package org.apache.seatunnel.e2e.connector.doris;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SupportSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog;
 import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory;
 import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -42,12 +46,32 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 @Slf4j
 public class DorisCatalogIT extends AbstractDorisIT {
 
     private static final String DATABASE = "test";
     private static final String SINK_TABLE = "doris_catalog_e2e";
+    private static final TablePath tablePath = TablePath.of(DATABASE, 
SINK_TABLE);
+    private static final CatalogTable catalogTable;
+
+    static {
+        TableSchema.Builder builder = TableSchema.builder();
+        builder.column(PhysicalColumn.of("k1", BasicType.INT_TYPE, 10, false, 
0, "k1"));
+        builder.column(PhysicalColumn.of("k2", BasicType.STRING_TYPE, 64, 
false, "", "k2"));
+        builder.column(PhysicalColumn.of("v1", BasicType.DOUBLE_TYPE, 10, 
true, null, "v1"));
+        builder.column(PhysicalColumn.of("v2", new DecimalType(10, 2), 0, 
false, 0.1, "v2"));
+        builder.primaryKey(PrimaryKey.of("pk", Arrays.asList("k1", "k2")));
+        catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("doris", tablePath),
+                        builder.build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        "test");
+    }
+
     private DorisCatalogFactory factory;
     private DorisCatalog catalog;
 
@@ -74,7 +98,6 @@ public class DorisCatalogIT extends AbstractDorisIT {
         map.put(DorisOptions.QUERY_PORT.key(), QUERY_PORT);
         map.put(DorisOptions.USERNAME.key(), USERNAME);
         map.put(DorisOptions.PASSWORD.key(), PASSWORD);
-        map.put(DorisOptions.DEFAULT_DATABASE.key(), PASSWORD);
 
         catalog = (DorisCatalog) factory.createCatalog(catalogName, 
ReadonlyConfig.fromMap(map));
 
@@ -98,26 +121,10 @@ public class DorisCatalogIT extends AbstractDorisIT {
             return;
         }
 
-        TablePath tablePath = TablePath.of(DATABASE, SINK_TABLE);
-
-        TableSchema.Builder builder = TableSchema.builder();
-        builder.column(PhysicalColumn.of("k1", BasicType.INT_TYPE, 10, false, 
0, "k1"));
-        builder.column(PhysicalColumn.of("k2", BasicType.STRING_TYPE, 64, 
false, "", "k2"));
-        builder.column(PhysicalColumn.of("v1", BasicType.DOUBLE_TYPE, 10, 
true, null, "v1"));
-        builder.column(PhysicalColumn.of("v2", new DecimalType(10, 2), 0, 
false, 0.1, "v2"));
-        builder.primaryKey(PrimaryKey.of("pk", Arrays.asList("k1", "k2")));
-        CatalogTable catalogTable =
-                CatalogTable.of(
-                        TableIdentifier.of("doris", DATABASE, SINK_TABLE),
-                        builder.build(),
-                        Collections.emptyMap(),
-                        Collections.emptyList(),
-                        "test");
-
         boolean dbCreated = false;
 
         List<String> databases = catalog.listDatabases();
-        Assertions.assertEquals(databases.size(), 1);
+        Assertions.assertFalse(databases.isEmpty());
 
         if (!catalog.databaseExists(tablePath.getDatabaseName())) {
             catalog.createDatabase(tablePath, false);
@@ -129,7 +136,7 @@ public class DorisCatalogIT extends AbstractDorisIT {
         Assertions.assertTrue(catalog.tableExists(tablePath));
 
         List<String> tables = catalog.listTables(tablePath.getDatabaseName());
-        Assertions.assertEquals(tables.size(), 1);
+        Assertions.assertFalse(tables.isEmpty());
 
         catalog.dropTable(tablePath, false);
         Assertions.assertFalse(catalog.tableExists(tablePath));
@@ -140,6 +147,113 @@ public class DorisCatalogIT extends AbstractDorisIT {
         }
     }
 
+    @Test
+    void testSaveMode() {
+        CatalogTable upstreamTable =
+                CatalogTable.of(
+                        TableIdentifier.of("doris", 
TablePath.of("test.test")), catalogTable);
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(
+                                        DorisOptions.FENODES.key(),
+                                        container.getHost() + ":" + HTTP_PORT);
+                                put(DorisOptions.USERNAME.key(), USERNAME);
+                                put(DorisOptions.PASSWORD.key(), PASSWORD);
+                            }
+                        });
+        assertCreateTable(upstreamTable, config, "test.test");
+
+        ReadonlyConfig config2 =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(
+                                        DorisOptions.FENODES.key(),
+                                        container.getHost() + ":" + HTTP_PORT);
+                                put(DorisOptions.DATABASE.key(), "test2");
+                                put(DorisOptions.TABLE.key(), "test2");
+                                put(DorisOptions.USERNAME.key(), USERNAME);
+                                put(DorisOptions.PASSWORD.key(), PASSWORD);
+                            }
+                        });
+        assertCreateTable(upstreamTable, config2, "test2.test2");
+
+        ReadonlyConfig config3 =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(
+                                        DorisOptions.FENODES.key(),
+                                        container.getHost() + ":" + HTTP_PORT);
+                                put(DorisOptions.TABLE_IDENTIFIER.key(), 
"test3.test3");
+                                put(DorisOptions.USERNAME.key(), USERNAME);
+                                put(DorisOptions.PASSWORD.key(), PASSWORD);
+                            }
+                        });
+        assertCreateTable(upstreamTable, config3, "test3.test3");
+
+        ReadonlyConfig config4 =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(
+                                        DorisOptions.FENODES.key(),
+                                        container.getHost() + ":" + HTTP_PORT);
+                                put(DorisOptions.DATABASE.key(), "test5");
+                                put(DorisOptions.TABLE.key(), "${table_name}");
+                                put(DorisOptions.USERNAME.key(), USERNAME);
+                                put(DorisOptions.PASSWORD.key(), PASSWORD);
+                            }
+                        });
+        assertCreateTable(upstreamTable, config4, "test5.test");
+
+        ReadonlyConfig config5 =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(
+                                        DorisOptions.FENODES.key(),
+                                        container.getHost() + ":" + HTTP_PORT);
+                                put(DorisOptions.DATABASE.key(), "test4");
+                                put(DorisOptions.TABLE.key(), "test4");
+                                put(DorisOptions.USERNAME.key(), USERNAME);
+                                put(DorisOptions.PASSWORD.key(), PASSWORD);
+                                
put(DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING.key(), true);
+                            }
+                        });
+        upstreamTable
+                .getTableSchema()
+                .getColumns()
+                .add(PhysicalColumn.of("v3", new DecimalType(66, 22), 66, 
false, null, "v3"));
+        CatalogTable newTable = assertCreateTable(upstreamTable, config5, 
"test4.test4");
+        Assertions.assertEquals(
+                BasicType.DOUBLE_TYPE, 
newTable.getTableSchema().getColumns().get(4).getDataType());
+    }
+
+    private CatalogTable assertCreateTable(
+            CatalogTable upstreamTable, ReadonlyConfig config, String 
fullName) {
+        DorisSinkFactory dorisSinkFactory = new DorisSinkFactory();
+        TableSinkFactoryContext context =
+                new TableSinkFactoryContext(
+                        upstreamTable, config, 
Thread.currentThread().getContextClassLoader());
+        SupportSaveMode sink = (SupportSaveMode) 
dorisSinkFactory.createSink(context).createSink();
+        sink.getSaveModeHandler().get().handleSaveMode();
+        CatalogTable createdTable = catalog.getTable(TablePath.of(fullName));
+        Assertions.assertEquals(
+                upstreamTable.getTableSchema().getColumns().size(),
+                createdTable.getTableSchema().getColumns().size());
+        Assertions.assertIterableEquals(
+                upstreamTable.getTableSchema().getColumns().stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toList()),
+                createdTable.getTableSchema().getColumns().stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toList()));
+        return createdTable;
+    }
+
     @AfterAll
     public void close() {
         if (catalog != null) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
index a4b2fbbd74..d4d4e69f9d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
@@ -36,7 +36,8 @@ sink {
     fenodes = "10.16.10.14:8234"
     username = root
     password = ""
-    table.identifier = "test.e2e_table_sink"
+    database = "test"
+    table = "e2e_table_sink"
     sink.label-prefix = "test-cdc"
     sink.enable-2pc = "false"
     sink.enable-delete = "true"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf
index f656130239..4c717c14c8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf
@@ -38,7 +38,8 @@ sink {
     fenodes = "doris_e2e:8030"
     username = root
     password = ""
-    table.identifier = "test.e2e_table_sink"
+    database = "test"
+    table = "e2e_table_sink"
     sink.enable-2pc = "false"
     sink.label-prefix = "test_doris"
     doris.config = {


Reply via email to