This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 268d76cbf3 [Improve] doris options (#8745)
268d76cbf3 is described below

commit 268d76cbf3961d7aa72d7892ee293b7924131d1d
Author: Jarvis <jar...@apache.org>
AuthorDate: Tue Feb 25 21:50:12 2025 +0800

    [Improve] doris options (#8745)
---
 .../connectors/doris/catalog/DorisCatalog.java     |  8 +--
 .../doris/catalog/DorisCatalogFactory.java         | 20 ++++--
 .../{DorisOptions.java => DorisBaseOptions.java}   | 33 +++------
 .../connectors/doris/config/DorisSinkConfig.java   | 14 ++--
 .../connectors/doris/config/DorisSinkOptions.java  | 82 +++++++---------------
 .../connectors/doris/config/DorisSourceConfig.java |  8 +--
 .../doris/config/DorisSourceOptions.java           | 43 ++++++------
 .../connectors/doris/config/DorisTableConfig.java  |  6 +-
 .../doris/datatype/AbstractDorisTypeConverter.java |  2 +-
 .../doris/datatype/DorisTypeConverterV1.java       |  2 +-
 .../doris/datatype/DorisTypeConverterV2.java       |  2 +-
 .../connectors/doris/sink/DorisSinkFactory.java    | 43 ++++++++++--
 .../doris/source/DorisSourceFactory.java           | 42 +++++------
 .../doris/source/reader/DorisValueReader.java      |  4 +-
 .../e2e/connector/doris/DorisCatalogIT.java        | 74 ++++++++++---------
 15 files changed, 191 insertions(+), 192 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
index ebc8f93785..0de8e352c4 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
@@ -39,7 +39,7 @@ import org.apache.seatunnel.api.table.converter.TypeConverter;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
-import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions;
 import 
org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterFactory;
 import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterV2;
 import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil;
@@ -463,9 +463,9 @@ public class DorisCatalog implements Catalog {
     private Map<String, String> connectorOptions() {
         Map<String, String> options = new HashMap<>();
         options.put("connector", "doris");
-        options.put(DorisOptions.FENODES.key(), String.join(",", 
frontEndNodes));
-        options.put(DorisOptions.USERNAME.key(), username);
-        options.put(DorisOptions.PASSWORD.key(), password);
+        options.put(DorisBaseOptions.FENODES.key(), String.join(",", 
frontEndNodes));
+        options.put(DorisBaseOptions.USERNAME.key(), username);
+        options.put(DorisBaseOptions.PASSWORD.key(), password);
         return options;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
index 7fd1da603e..3d4d612825 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java
@@ -22,12 +22,12 @@ import 
org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions;
 import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE;
 
 @AutoService(Factory.class)
@@ -37,10 +37,10 @@ public class DorisCatalogFactory implements CatalogFactory {
     public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
         return new DorisCatalog(
                 catalogName,
-                options.get(DorisOptions.FENODES),
-                options.get(DorisOptions.QUERY_PORT),
-                options.get(DorisOptions.USERNAME),
-                options.get(DorisOptions.PASSWORD),
+                options.get(DorisBaseOptions.FENODES),
+                options.get(DorisBaseOptions.QUERY_PORT),
+                options.get(DorisBaseOptions.USERNAME),
+                options.get(DorisBaseOptions.PASSWORD),
                 options.get(SAVE_MODE_CREATE_TEMPLATE),
                 options.get(DorisSinkOptions.DEFAULT_DATABASE));
     }
@@ -52,6 +52,12 @@ public class DorisCatalogFactory implements CatalogFactory {
 
     @Override
     public OptionRule optionRule() {
-        return DorisOptions.CATALOG_RULE.build();
+        return OptionRule.builder()
+                .required(
+                        DorisBaseOptions.FENODES,
+                        DorisBaseOptions.QUERY_PORT,
+                        DorisBaseOptions.USERNAME,
+                        DorisBaseOptions.PASSWORD)
+                .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisBaseOptions.java
similarity index 70%
rename from 
seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
rename to 
seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisBaseOptions.java
index bcdf24c9d7..2d5989dd2e 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisBaseOptions.java
@@ -19,58 +19,45 @@ package org.apache.seatunnel.connectors.doris.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.util.OptionRule;
 
-public interface DorisOptions {
+public class DorisBaseOptions {
 
-    String IDENTIFIER = "Doris";
-    String DORIS_DEFAULT_CLUSTER = "default_cluster";
-    int DORIS_BATCH_SIZE_DEFAULT = 1024;
+    public static final String IDENTIFIER = "Doris";
 
     // common option
-    Option<String> FENODES =
+    public static final Option<String> FENODES =
             Options.key("fenodes")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("doris fe http address.");
 
-    Option<Integer> QUERY_PORT =
+    public static final Option<Integer> QUERY_PORT =
             Options.key("query-port")
                     .intType()
                     .defaultValue(9030)
                     .withDescription("doris query port");
 
-    @Deprecated
-    Option<String> TABLE_IDENTIFIER =
-            Options.key("table.identifier")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the doris table name.");
-
-    Option<String> USERNAME =
+    public static final Option<String> USERNAME =
             Options.key("username")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("the doris user name.");
 
-    Option<String> PASSWORD =
+    public static final Option<String> PASSWORD =
             Options.key("password")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("the doris password.");
 
-    Option<String> TABLE =
+    public static final Option<String> TABLE =
             
Options.key("table").stringType().noDefaultValue().withDescription("table");
 
-    Option<String> DATABASE =
+    public static final Option<String> DATABASE =
             
Options.key("database").stringType().noDefaultValue().withDescription("database");
 
-    Option<Integer> DORIS_BATCH_SIZE =
+    public static final Option<Integer> DORIS_BATCH_SIZE =
             Options.key("doris.batch.size")
                     .intType()
-                    .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+                    .defaultValue(1024)
                     .withDescription("the batch size of the doris 
read/write.");
-
-    OptionRule.Builder CATALOG_RULE =
-            OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, 
PASSWORD);
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
index 8f0d948042..ee750e15c3 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
@@ -29,13 +29,13 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.Properties;
 
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
-import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DORIS_BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.FENODES;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.PASSWORD;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE;
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
index 9884eb0554..28494eb40e 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
@@ -19,68 +19,59 @@ package org.apache.seatunnel.connectors.doris.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
 import java.util.Map;
 
-import static 
org.apache.seatunnel.api.options.SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
-import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
+public class DorisSinkOptions extends DorisBaseOptions {
 
-public interface DorisSinkOptions {
-
-    int DEFAULT_SINK_CHECK_INTERVAL = 10000;
-    int DEFAULT_SINK_MAX_RETRIES = 3;
-    int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024;
-    int DEFAULT_SINK_BUFFER_COUNT = 3;
+    @Deprecated
+    public static final Option<String> TABLE_IDENTIFIER =
+            Options.key("table.identifier")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the doris table name.");
 
-    Option<Boolean> SINK_ENABLE_2PC =
+    public static final Option<Boolean> SINK_ENABLE_2PC =
             Options.key("sink.enable-2pc")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("enable 2PC while loading");
 
-    Option<Integer> SINK_CHECK_INTERVAL =
+    public static final Option<Integer> SINK_CHECK_INTERVAL =
             Options.key("sink.check-interval")
                     .intType()
-                    .defaultValue(DEFAULT_SINK_CHECK_INTERVAL)
+                    .defaultValue(10000)
                     .withDescription("check exception with the interval while 
loading");
-    Option<Integer> SINK_MAX_RETRIES =
+    public static final Option<Integer> SINK_MAX_RETRIES =
             Options.key("sink.max-retries")
                     .intType()
-                    .defaultValue(DEFAULT_SINK_MAX_RETRIES)
+                    .defaultValue(3)
                     .withDescription("the max retry times if writing records 
to database failed.");
-    Option<Integer> SINK_BUFFER_SIZE =
+    public static final Option<Integer> SINK_BUFFER_SIZE =
             Options.key("sink.buffer-size")
                     .intType()
-                    .defaultValue(DEFAULT_SINK_BUFFER_SIZE)
+                    .defaultValue(256 * 1024)
                     .withDescription("the buffer size to cache data for stream 
load.");
-    Option<Integer> SINK_BUFFER_COUNT =
+    public static final Option<Integer> SINK_BUFFER_COUNT =
             Options.key("sink.buffer-count")
                     .intType()
-                    .defaultValue(DEFAULT_SINK_BUFFER_COUNT)
+                    .defaultValue(3)
                     .withDescription("the buffer count to cache data for 
stream load.");
-    Option<String> SINK_LABEL_PREFIX =
+    public static final Option<String> SINK_LABEL_PREFIX =
             Options.key("sink.label-prefix")
                     .stringType()
                     .defaultValue("")
                     .withDescription("the unique label prefix.");
-    Option<Boolean> SINK_ENABLE_DELETE =
+    public static final Option<Boolean> SINK_ENABLE_DELETE =
             Options.key("sink.enable-delete")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("whether to enable the delete function");
 
-    Option<Map<String, String>> DORIS_SINK_CONFIG_PREFIX =
+    public static final Option<Map<String, String>> DORIS_SINK_CONFIG_PREFIX =
             Options.key("doris.config")
                     .mapType()
                     .noDefaultValue()
@@ -88,28 +79,28 @@ public interface DorisSinkOptions {
                             "The parameter of the Stream Load data_desc. "
                                     + "The way to specify the parameter is to 
add the prefix `doris.config` to the original load parameter name ");
 
-    Option<String> DEFAULT_DATABASE =
+    public static final Option<String> DEFAULT_DATABASE =
             Options.key("default-database")
                     .stringType()
                     .defaultValue("information_schema")
                     .withDescription("");
 
-    Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
+    public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
             Options.key("schema_save_mode")
                     .enumType(SchemaSaveMode.class)
                     .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
                     .withDescription("schema_save_mode");
 
-    Option<DataSaveMode> DATA_SAVE_MODE =
+    public static final Option<DataSaveMode> DATA_SAVE_MODE =
             Options.key("data_save_mode")
                     .enumType(DataSaveMode.class)
                     .defaultValue(DataSaveMode.APPEND_DATA)
                     .withDescription("data_save_mode");
 
-    Option<String> CUSTOM_SQL =
+    public static final Option<String> CUSTOM_SQL =
             
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
 
-    Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING =
+    public static final Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING =
             Options.key("needs_unsupported_type_casting")
                     .booleanType()
                     .defaultValue(false)
@@ -117,7 +108,7 @@ public interface DorisSinkOptions {
                             "Whether to enable the unsupported type casting, 
such as Decimal64 to Double");
 
     // create table
-    Option<String> SAVE_MODE_CREATE_TEMPLATE =
+    public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
             Options.key("save_mode_create_template")
                     .stringType()
                     .defaultValue(
@@ -147,27 +138,4 @@ public interface DorisSinkOptions {
                                     + "\"disable_auto_compaction\" = 
\"false\"\n"
                                     + ")")
                     .withDescription("Create table statement template, used to 
create Doris table");
-
-    OptionRule.Builder SINK_RULE =
-            OptionRule.builder()
-                    .required(
-                            FENODES,
-                            USERNAME,
-                            PASSWORD,
-                            SINK_LABEL_PREFIX,
-                            DORIS_SINK_CONFIG_PREFIX,
-                            DATA_SAVE_MODE,
-                            SCHEMA_SAVE_MODE)
-                    .optional(
-                            DATABASE,
-                            TABLE,
-                            TABLE_IDENTIFIER,
-                            QUERY_PORT,
-                            DORIS_BATCH_SIZE,
-                            SINK_ENABLE_2PC,
-                            SINK_ENABLE_DELETE,
-                            MULTI_TABLE_SINK_REPLICA,
-                            SAVE_MODE_CREATE_TEMPLATE,
-                            NEEDS_UNSUPPORTED_TYPE_CASTING)
-                    .conditional(DATA_SAVE_MODE, 
DataSaveMode.CUSTOM_PROCESSING, CUSTOM_SQL);
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java
index 999f8fbfea..94615fcd85 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java
@@ -25,10 +25,10 @@ import lombok.experimental.SuperBuilder;
 import java.io.Serializable;
 import java.util.List;
 
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.FENODES;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.PASSWORD;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java
index 2ee852ffcc..49b9f85ea7 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java
@@ -22,80 +22,81 @@ import org.apache.seatunnel.api.configuration.Options;
 
 import java.util.List;
 
-public interface DorisSourceOptions {
+public class DorisSourceOptions extends DorisBaseOptions {
 
-    int DORIS_TABLET_SIZE_MIN = 1;
-    int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
-    int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
-    int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
-    int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600;
-    int DORIS_REQUEST_RETRIES_DEFAULT = 3;
-    Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
-    int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
-    long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
+    public static final String DORIS_DEFAULT_CLUSTER = "default_cluster";
+    public static final int DORIS_TABLET_SIZE_MIN = 1;
+    public static final int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
+    public static final int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 
1000;
+    public static final int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
+    public static final int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600;
+    public static final int DORIS_REQUEST_RETRIES_DEFAULT = 3;
+    public static final Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
+    public static final int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
+    public static final long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
 
-    Option<List<DorisTableConfig>> TABLE_LIST =
+    public static final Option<List<DorisTableConfig>> TABLE_LIST =
             Options.key("table_list")
                     .listType(DorisTableConfig.class)
                     .noDefaultValue()
                     .withDescription("table list config.");
 
-    Option<String> DORIS_READ_FIELD =
+    public static final Option<String> DORIS_READ_FIELD =
             Options.key("doris.read.field")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
                             "List of column names in the Doris table, 
separated by commas");
-    Option<String> DORIS_FILTER_QUERY =
+    public static final Option<String> DORIS_FILTER_QUERY =
             Options.key("doris.filter.query")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
                             "Filter expression of the query, which is 
transparently transmitted to Doris. Doris uses this expression to complete 
source-side data filtering");
 
-    Option<Integer> DORIS_TABLET_SIZE =
+    public static final Option<Integer> DORIS_TABLET_SIZE =
             Options.key("doris.request.tablet.size")
                     .intType()
                     .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
                     .withDescription("");
 
-    Option<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
+    public static final Option<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
             Options.key("doris.request.connect.timeout.ms")
                     .intType()
                     .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
                     .withDescription("");
 
-    Option<Integer> DORIS_REQUEST_READ_TIMEOUT_MS =
+    public static final Option<Integer> DORIS_REQUEST_READ_TIMEOUT_MS =
             Options.key("doris.request.read.timeout.ms")
                     .intType()
                     .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
                     .withDescription("");
 
-    Option<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S =
+    public static final Option<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S =
             Options.key("doris.request.query.timeout.s")
                     .intType()
                     .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
                     .withDescription("");
 
-    Option<Integer> DORIS_REQUEST_RETRIES =
+    public static final Option<Integer> DORIS_REQUEST_RETRIES =
             Options.key("doris.request.retries")
                     .intType()
                     .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
                     .withDescription("");
 
-    Option<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
+    public static final Option<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
             Options.key("doris.deserialize.arrow.async")
                     .booleanType()
                     .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
                     .withDescription("");
 
-    Option<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
+    public static final Option<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
             Options.key("doris.request.retriesdoris.deserialize.queue.size")
                     .intType()
                     .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
                     .withDescription("");
 
-    Option<Long> DORIS_EXEC_MEM_LIMIT =
+    public static final Option<Long> DORIS_EXEC_MEM_LIMIT =
             Options.key("doris.exec.mem.limit")
                     .longType()
                     .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
index 624d25636b..bfd2fc7873 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
@@ -35,9 +35,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE;
-import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DORIS_BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_EXEC_MEM_LIMIT;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD;
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
index 67266b453f..a4888f3fdd 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
@@ -31,7 +31,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.util.Locale;
 
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER;
 
 @Slf4j
 public abstract class AbstractDorisTypeConverter implements 
TypeConverter<BasicTypeDefine> {
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
index 9b7e98368f..c508b9a14c 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.LocalTimeType;
 import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER;
 
 /** Doris type converter for version 1.2.x */
 @Slf4j
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
index 46ae79251e..4a24e1d5da 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
@@ -36,7 +36,7 @@ import java.util.Locale;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER;
 
 /** Doris type converter for version 2.x */
 @Slf4j
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
index 9a2ce67be2..009d060506 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.doris.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.connector.TableSink;
@@ -38,10 +40,8 @@ import com.google.auto.service.AutoService;
 import java.util.Arrays;
 import java.util.List;
 
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
-import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
 
 @AutoService(Factory.class)
@@ -49,12 +49,41 @@ public class DorisSinkFactory implements TableSinkFactory {
 
     @Override
     public String factoryIdentifier() {
-        return IDENTIFIER;
+        return DorisSinkOptions.IDENTIFIER;
     }
 
     @Override
     public OptionRule optionRule() {
-        return DorisSinkOptions.SINK_RULE.build();
+        return OptionRule.builder()
+                .required(
+                        DorisSinkOptions.FENODES,
+                        DorisSinkOptions.USERNAME,
+                        DorisSinkOptions.PASSWORD,
+                        DorisSinkOptions.SINK_LABEL_PREFIX,
+                        DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX,
+                        DorisSinkOptions.DATA_SAVE_MODE,
+                        DorisSinkOptions.SCHEMA_SAVE_MODE)
+                .optional(
+                        DorisSinkOptions.DATABASE,
+                        DorisSinkOptions.TABLE,
+                        DorisSinkOptions.TABLE_IDENTIFIER,
+                        DorisSinkOptions.QUERY_PORT,
+                        DorisSinkOptions.DORIS_BATCH_SIZE,
+                        DorisSinkOptions.SINK_ENABLE_2PC,
+                        DorisSinkOptions.SINK_ENABLE_DELETE,
+                        DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
+                        DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING,
+                        DorisSinkOptions.SINK_CHECK_INTERVAL,
+                        DorisSinkOptions.SINK_MAX_RETRIES,
+                        DorisSinkOptions.SINK_BUFFER_SIZE,
+                        DorisSinkOptions.SINK_BUFFER_COUNT,
+                        DorisSinkOptions.DEFAULT_DATABASE,
+                        SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
+                .conditional(
+                        DorisSinkOptions.DATA_SAVE_MODE,
+                        DataSaveMode.CUSTOM_PROCESSING,
+                        DorisSinkOptions.CUSTOM_SQL)
+                .build();
     }
 
     @Override
@@ -79,7 +108,7 @@ public class DorisSinkFactory implements TableSinkFactory {
         TableIdentifier tableId = catalogTable.getTableId();
         String tableName;
         String databaseName;
-        String tableIdentifier = options.get(TABLE_IDENTIFIER);
+        String tableIdentifier = 
options.get(DorisSinkOptions.TABLE_IDENTIFIER);
         if (StringUtils.isNotEmpty(tableIdentifier)) {
             tableName = tableIdentifier.split("\\.")[1];
             databaseName = tableIdentifier.split("\\.")[0];
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
index 05f3e408ed..1bdbd86883 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java
@@ -29,6 +29,7 @@ import 
org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog;
 import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory;
 import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig;
+import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions;
 import org.apache.seatunnel.connectors.doris.config.DorisTableConfig;
 
 import org.apache.commons.lang3.StringUtils;
@@ -43,37 +44,36 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
-import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD;
-import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.TABLE_LIST;
-
 @Slf4j
 @AutoService(Factory.class)
 public class DorisSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return IDENTIFIER;
+        return DorisSourceOptions.IDENTIFIER;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(FENODES, USERNAME, PASSWORD)
-                .optional(TABLE_LIST)
-                .optional(DATABASE)
-                .optional(TABLE)
-                .optional(DORIS_FILTER_QUERY)
-                .optional(DORIS_READ_FIELD)
-                .optional(QUERY_PORT)
-                .optional(DORIS_BATCH_SIZE)
+                .required(
+                        DorisSourceOptions.FENODES,
+                        DorisSourceOptions.USERNAME,
+                        DorisSourceOptions.PASSWORD)
+                .optional(DorisSourceOptions.TABLE_LIST)
+                .optional(DorisSourceOptions.DATABASE)
+                .optional(DorisSourceOptions.TABLE)
+                .optional(DorisSourceOptions.DORIS_FILTER_QUERY)
+                .optional(DorisSourceOptions.DORIS_TABLET_SIZE)
+                .optional(DorisSourceOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS)
+                .optional(DorisSourceOptions.DORIS_REQUEST_READ_TIMEOUT_MS)
+                .optional(DorisSourceOptions.DORIS_REQUEST_QUERY_TIMEOUT_S)
+                .optional(DorisSourceOptions.DORIS_REQUEST_RETRIES)
+                .optional(DorisSourceOptions.DORIS_DESERIALIZE_ARROW_ASYNC)
+                .optional(DorisSourceOptions.DORIS_DESERIALIZE_QUEUE_SIZE)
+                .optional(DorisSourceOptions.DORIS_READ_FIELD)
+                .optional(DorisSourceOptions.QUERY_PORT)
+                .optional(DorisSourceOptions.DORIS_BATCH_SIZE)
+                .optional(DorisSourceOptions.DORIS_EXEC_MEM_LIMIT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
index e68366cf26..6ec8d47c5c 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.doris.backend.BackendClient;
 import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig;
+import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
 import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition;
@@ -45,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static 
org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_DEFAULT_CLUSTER;
 import static 
org.apache.seatunnel.connectors.doris.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
 
 @Slf4j
@@ -115,7 +115,7 @@ public class DorisValueReader {
 
     private TScanOpenParams openParams() {
         TScanOpenParams params = new TScanOpenParams();
-        params.setCluster(DORIS_DEFAULT_CLUSTER);
+        params.setCluster(DorisSourceOptions.DORIS_DEFAULT_CLUSTER);
         params.setDatabase(partition.getDatabase());
         params.setTable(partition.getTable());
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index 2a55cc350c..50db297dbf 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -34,7 +34,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog;
 import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory;
-import org.apache.seatunnel.connectors.doris.config.DorisOptions;
+import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions;
 import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions;
 import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions;
 import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory;
@@ -99,10 +99,10 @@ public class DorisCatalogIT extends AbstractDorisIT {
         factory = new DorisCatalogFactory();
 
         Map<String, Object> map = new HashMap<>();
-        map.put(DorisOptions.FENODES.key(), frontEndNodes);
-        map.put(DorisOptions.QUERY_PORT.key(), QUERY_PORT);
-        map.put(DorisOptions.USERNAME.key(), USERNAME);
-        map.put(DorisOptions.PASSWORD.key(), PASSWORD);
+        map.put(DorisBaseOptions.FENODES.key(), frontEndNodes);
+        map.put(DorisBaseOptions.QUERY_PORT.key(), QUERY_PORT);
+        map.put(DorisBaseOptions.USERNAME.key(), USERNAME);
+        map.put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
 
         catalog = (DorisCatalog) factory.createCatalog(catalogName, 
ReadonlyConfig.fromMap(map));
 
@@ -163,10 +163,10 @@ public class DorisCatalogIT extends AbstractDorisIT {
                         new HashMap<String, Object>() {
                             {
                                 put(
-                                        DorisOptions.FENODES.key(),
+                                        DorisBaseOptions.FENODES.key(),
                                         container.getHost() + ":" + HTTP_PORT);
-                                put(DorisOptions.USERNAME.key(), USERNAME);
-                                put(DorisOptions.PASSWORD.key(), PASSWORD);
+                                put(DorisBaseOptions.USERNAME.key(), USERNAME);
+                                put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
                             }
                         });
         assertCreateTable(upstreamTable, config, "test.test");
@@ -176,12 +176,12 @@ public class DorisCatalogIT extends AbstractDorisIT {
                         new HashMap<String, Object>() {
                             {
                                 put(
-                                        DorisOptions.FENODES.key(),
+                                        DorisBaseOptions.FENODES.key(),
                                         container.getHost() + ":" + HTTP_PORT);
-                                put(DorisOptions.DATABASE.key(), "test2");
-                                put(DorisOptions.TABLE.key(), "test2");
-                                put(DorisOptions.USERNAME.key(), USERNAME);
-                                put(DorisOptions.PASSWORD.key(), PASSWORD);
+                                put(DorisBaseOptions.DATABASE.key(), "test2");
+                                put(DorisBaseOptions.TABLE.key(), "test2");
+                                put(DorisBaseOptions.USERNAME.key(), USERNAME);
+                                put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
                             }
                         });
         assertCreateTable(upstreamTable, config2, "test2.test2");
@@ -191,11 +191,11 @@ public class DorisCatalogIT extends AbstractDorisIT {
                         new HashMap<String, Object>() {
                             {
                                 put(
-                                        DorisOptions.FENODES.key(),
+                                        DorisBaseOptions.FENODES.key(),
                                         container.getHost() + ":" + HTTP_PORT);
-                                put(DorisOptions.TABLE_IDENTIFIER.key(), 
"test3.test3");
-                                put(DorisOptions.USERNAME.key(), USERNAME);
-                                put(DorisOptions.PASSWORD.key(), PASSWORD);
+                                put(DorisSinkOptions.TABLE_IDENTIFIER.key(), 
"test3.test3");
+                                put(DorisBaseOptions.USERNAME.key(), USERNAME);
+                                put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
                             }
                         });
         assertCreateTable(upstreamTable, config3, "test3.test3");
@@ -205,12 +205,12 @@ public class DorisCatalogIT extends AbstractDorisIT {
                         new HashMap<String, Object>() {
                             {
                                 put(
-                                        DorisOptions.FENODES.key(),
+                                        DorisBaseOptions.FENODES.key(),
                                         container.getHost() + ":" + HTTP_PORT);
-                                put(DorisOptions.DATABASE.key(), "test5");
-                                put(DorisOptions.TABLE.key(), "${table_name}");
-                                put(DorisOptions.USERNAME.key(), USERNAME);
-                                put(DorisOptions.PASSWORD.key(), PASSWORD);
+                                put(DorisBaseOptions.DATABASE.key(), "test5");
+                                put(DorisBaseOptions.TABLE.key(), 
"${table_name}");
+                                put(DorisBaseOptions.USERNAME.key(), USERNAME);
+                                put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
                             }
                         });
         assertCreateTable(upstreamTable, config4, "test5.test");
@@ -220,12 +220,12 @@ public class DorisCatalogIT extends AbstractDorisIT {
                         new HashMap<String, Object>() {
                             {
                                 put(
-                                        DorisOptions.FENODES.key(),
+                                        DorisBaseOptions.FENODES.key(),
                                         container.getHost() + ":" + HTTP_PORT);
-                                put(DorisOptions.DATABASE.key(), "test4");
-                                put(DorisOptions.TABLE.key(), "test4");
-                                put(DorisOptions.USERNAME.key(), USERNAME);
-                                put(DorisOptions.PASSWORD.key(), PASSWORD);
+                                put(DorisBaseOptions.DATABASE.key(), "test4");
+                                put(DorisBaseOptions.TABLE.key(), "test4");
+                                put(DorisBaseOptions.USERNAME.key(), USERNAME);
+                                put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
                                 
put(DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING.key(), true);
                             }
                         });
@@ -295,21 +295,29 @@ public class DorisCatalogIT extends AbstractDorisIT {
                                         ReadonlyConfig.fromMap(
                                                 new HashMap<String, Object>() {
                                                     {
-                                                        
put(DorisOptions.DATABASE.key(), DATABASE);
-                                                        
put(DorisOptions.TABLE.key(), SINK_TABLE);
-                                                        
put(DorisOptions.USERNAME.key(), USERNAME);
-                                                        
put(DorisOptions.PASSWORD.key(), PASSWORD);
+                                                        put(
+                                                                
DorisBaseOptions.DATABASE.key(),
+                                                                DATABASE);
+                                                        put(
+                                                                
DorisBaseOptions.TABLE.key(),
+                                                                SINK_TABLE);
+                                                        put(
+                                                                
DorisBaseOptions.USERNAME.key(),
+                                                                USERNAME);
+                                                        put(
+                                                                
DorisBaseOptions.PASSWORD.key(),
+                                                                PASSWORD);
                                                         put(
                                                                 
DorisSourceOptions.DORIS_READ_FIELD
                                                                         .key(),
                                                                 "k1,k2");
                                                         put(
-                                                                
DorisOptions.FENODES.key(),
+                                                                
DorisBaseOptions.FENODES.key(),
                                                                 
container.getHost()
                                                                         + ":"
                                                                         + 
HTTP_PORT);
                                                         put(
-                                                                
DorisOptions.QUERY_PORT.key(),
+                                                                
DorisBaseOptions.QUERY_PORT.key(),
                                                                 QUERY_PORT);
                                                     }
                                                 }),

Reply via email to