This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 268d76cbf3 [Improve] doris options (#8745) 268d76cbf3 is described below commit 268d76cbf3961d7aa72d7892ee293b7924131d1d Author: Jarvis <jar...@apache.org> AuthorDate: Tue Feb 25 21:50:12 2025 +0800 [Improve] doris options (#8745) --- .../connectors/doris/catalog/DorisCatalog.java | 8 +-- .../doris/catalog/DorisCatalogFactory.java | 20 ++++-- .../{DorisOptions.java => DorisBaseOptions.java} | 33 +++------ .../connectors/doris/config/DorisSinkConfig.java | 14 ++-- .../connectors/doris/config/DorisSinkOptions.java | 82 +++++++--------------- .../connectors/doris/config/DorisSourceConfig.java | 8 +-- .../doris/config/DorisSourceOptions.java | 43 ++++++------ .../connectors/doris/config/DorisTableConfig.java | 6 +- .../doris/datatype/AbstractDorisTypeConverter.java | 2 +- .../doris/datatype/DorisTypeConverterV1.java | 2 +- .../doris/datatype/DorisTypeConverterV2.java | 2 +- .../connectors/doris/sink/DorisSinkFactory.java | 43 ++++++++++-- .../doris/source/DorisSourceFactory.java | 42 +++++------ .../doris/source/reader/DorisValueReader.java | 4 +- .../e2e/connector/doris/DorisCatalogIT.java | 74 ++++++++++--------- 15 files changed, 191 insertions(+), 192 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java index ebc8f93785..0de8e352c4 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java @@ -39,7 +39,7 @@ import org.apache.seatunnel.api.table.converter.TypeConverter; import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; -import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions; import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterFactory; import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterV2; import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil; @@ -463,9 +463,9 @@ public class DorisCatalog implements Catalog { private Map<String, String> connectorOptions() { Map<String, String> options = new HashMap<>(); options.put("connector", "doris"); - options.put(DorisOptions.FENODES.key(), String.join(",", frontEndNodes)); - options.put(DorisOptions.USERNAME.key(), username); - options.put(DorisOptions.PASSWORD.key(), password); + options.put(DorisBaseOptions.FENODES.key(), String.join(",", frontEndNodes)); + options.put(DorisBaseOptions.USERNAME.key(), username); + options.put(DorisBaseOptions.PASSWORD.key(), password); return options; } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java index 7fd1da603e..3d4d612825 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java @@ -22,12 +22,12 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.factory.CatalogFactory; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions; import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions; import com.google.auto.service.AutoService; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER; import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE; @AutoService(Factory.class) @@ -37,10 +37,10 @@ public class DorisCatalogFactory implements CatalogFactory { public Catalog createCatalog(String catalogName, ReadonlyConfig options) { return new DorisCatalog( catalogName, - options.get(DorisOptions.FENODES), - options.get(DorisOptions.QUERY_PORT), - options.get(DorisOptions.USERNAME), - options.get(DorisOptions.PASSWORD), + options.get(DorisBaseOptions.FENODES), + options.get(DorisBaseOptions.QUERY_PORT), + options.get(DorisBaseOptions.USERNAME), + options.get(DorisBaseOptions.PASSWORD), options.get(SAVE_MODE_CREATE_TEMPLATE), options.get(DorisSinkOptions.DEFAULT_DATABASE)); } @@ -52,6 +52,12 @@ public class DorisCatalogFactory implements CatalogFactory { @Override public OptionRule optionRule() { - return DorisOptions.CATALOG_RULE.build(); + return OptionRule.builder() + .required( + DorisBaseOptions.FENODES, + DorisBaseOptions.QUERY_PORT, + DorisBaseOptions.USERNAME, + DorisBaseOptions.PASSWORD) + .build(); } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisBaseOptions.java similarity index 70% rename from seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java rename to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisBaseOptions.java index bcdf24c9d7..2d5989dd2e 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisBaseOptions.java @@ -19,58 +19,45 @@ package org.apache.seatunnel.connectors.doris.config; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.api.configuration.util.OptionRule; -public interface DorisOptions { +public class DorisBaseOptions { - String IDENTIFIER = "Doris"; - String DORIS_DEFAULT_CLUSTER = "default_cluster"; - int DORIS_BATCH_SIZE_DEFAULT = 1024; + public static final String IDENTIFIER = "Doris"; // common option - Option<String> FENODES = + public static final Option<String> FENODES = Options.key("fenodes") .stringType() .noDefaultValue() .withDescription("doris fe http address."); - Option<Integer> QUERY_PORT = + public static final Option<Integer> QUERY_PORT = Options.key("query-port") .intType() .defaultValue(9030) .withDescription("doris query port"); - @Deprecated - Option<String> TABLE_IDENTIFIER = - Options.key("table.identifier") - .stringType() - .noDefaultValue() - .withDescription("the doris table name."); - - Option<String> USERNAME = + public static final Option<String> USERNAME = Options.key("username") .stringType() .noDefaultValue() .withDescription("the doris user name."); - Option<String> PASSWORD = + public static final Option<String> PASSWORD = Options.key("password") .stringType() .noDefaultValue() .withDescription("the doris password."); - Option<String> TABLE = + public static final Option<String> TABLE = Options.key("table").stringType().noDefaultValue().withDescription("table"); - Option<String> DATABASE = + public static final Option<String> DATABASE = Options.key("database").stringType().noDefaultValue().withDescription("database"); - Option<Integer> DORIS_BATCH_SIZE = + public static final Option<Integer> DORIS_BATCH_SIZE = Options.key("doris.batch.size") .intType() - .defaultValue(DORIS_BATCH_SIZE_DEFAULT) + .defaultValue(1024) .withDescription("the batch size of the doris read/write."); - - OptionRule.Builder CATALOG_RULE = - OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, PASSWORD); } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java index 8f0d948042..ee750e15c3 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java @@ -29,13 +29,13 @@ import java.io.Serializable; import java.util.Map; import java.util.Properties; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DORIS_BATCH_SIZE; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.FENODES; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.PASSWORD; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME; import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX; import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING; import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java index 9884eb0554..28494eb40e 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java @@ -19,68 +19,59 @@ package org.apache.seatunnel.connectors.doris.config; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.SaveModePlaceHolder; import org.apache.seatunnel.api.sink.SchemaSaveMode; import java.util.Map; -import static org.apache.seatunnel.api.options.SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME; +public class DorisSinkOptions extends DorisBaseOptions { -public interface DorisSinkOptions { - - int DEFAULT_SINK_CHECK_INTERVAL = 10000; - int DEFAULT_SINK_MAX_RETRIES = 3; - int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024; - int DEFAULT_SINK_BUFFER_COUNT = 3; + @Deprecated + public static final Option<String> TABLE_IDENTIFIER = + Options.key("table.identifier") + .stringType() + .noDefaultValue() + .withDescription("the doris table name."); - Option<Boolean> SINK_ENABLE_2PC = + public static final Option<Boolean> SINK_ENABLE_2PC = Options.key("sink.enable-2pc") .booleanType() .defaultValue(false) .withDescription("enable 2PC while loading"); - Option<Integer> SINK_CHECK_INTERVAL = + public static final Option<Integer> SINK_CHECK_INTERVAL = Options.key("sink.check-interval") .intType() - .defaultValue(DEFAULT_SINK_CHECK_INTERVAL) + .defaultValue(10000) .withDescription("check exception with the interval while loading"); - Option<Integer> SINK_MAX_RETRIES = + public static final Option<Integer> SINK_MAX_RETRIES = Options.key("sink.max-retries") .intType() - .defaultValue(DEFAULT_SINK_MAX_RETRIES) + .defaultValue(3) .withDescription("the max retry times if writing records to database failed."); - Option<Integer> SINK_BUFFER_SIZE = + public static final Option<Integer> SINK_BUFFER_SIZE = Options.key("sink.buffer-size") .intType() - .defaultValue(DEFAULT_SINK_BUFFER_SIZE) + .defaultValue(256 * 1024) .withDescription("the buffer size to cache data for stream load."); - Option<Integer> SINK_BUFFER_COUNT = + public static final Option<Integer> SINK_BUFFER_COUNT = Options.key("sink.buffer-count") .intType() - .defaultValue(DEFAULT_SINK_BUFFER_COUNT) + .defaultValue(3) .withDescription("the buffer count to cache data for stream load."); - Option<String> SINK_LABEL_PREFIX = + public static final Option<String> SINK_LABEL_PREFIX = Options.key("sink.label-prefix") .stringType() .defaultValue("") .withDescription("the unique label prefix."); - Option<Boolean> SINK_ENABLE_DELETE = + public static final Option<Boolean> SINK_ENABLE_DELETE = Options.key("sink.enable-delete") .booleanType() .defaultValue(false) .withDescription("whether to enable the delete function"); - Option<Map<String, String>> DORIS_SINK_CONFIG_PREFIX = + public static final Option<Map<String, String>> DORIS_SINK_CONFIG_PREFIX = Options.key("doris.config") .mapType() .noDefaultValue() @@ -88,28 +79,28 @@ public interface DorisSinkOptions { "The parameter of the Stream Load data_desc. " + "The way to specify the parameter is to add the prefix `doris.config` to the original load parameter name "); - Option<String> DEFAULT_DATABASE = + public static final Option<String> DEFAULT_DATABASE = Options.key("default-database") .stringType() .defaultValue("information_schema") .withDescription(""); - Option<SchemaSaveMode> SCHEMA_SAVE_MODE = + public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE = Options.key("schema_save_mode") .enumType(SchemaSaveMode.class) .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST) .withDescription("schema_save_mode"); - Option<DataSaveMode> DATA_SAVE_MODE = + public static final Option<DataSaveMode> DATA_SAVE_MODE = Options.key("data_save_mode") .enumType(DataSaveMode.class) .defaultValue(DataSaveMode.APPEND_DATA) .withDescription("data_save_mode"); - Option<String> CUSTOM_SQL = + public static final Option<String> CUSTOM_SQL = Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql"); - Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING = + public static final Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING = Options.key("needs_unsupported_type_casting") .booleanType() .defaultValue(false) @@ -117,7 +108,7 @@ public interface DorisSinkOptions { "Whether to enable the unsupported type casting, such as Decimal64 to Double"); // create table - Option<String> SAVE_MODE_CREATE_TEMPLATE = + public static final Option<String> SAVE_MODE_CREATE_TEMPLATE = Options.key("save_mode_create_template") .stringType() .defaultValue( @@ -147,27 +138,4 @@ public interface DorisSinkOptions { + "\"disable_auto_compaction\" = \"false\"\n" + ")") .withDescription("Create table statement template, used to create Doris table"); - - OptionRule.Builder SINK_RULE = - OptionRule.builder() - .required( - FENODES, - USERNAME, - PASSWORD, - SINK_LABEL_PREFIX, - DORIS_SINK_CONFIG_PREFIX, - DATA_SAVE_MODE, - SCHEMA_SAVE_MODE) - .optional( - DATABASE, - TABLE, - TABLE_IDENTIFIER, - QUERY_PORT, - DORIS_BATCH_SIZE, - SINK_ENABLE_2PC, - SINK_ENABLE_DELETE, - MULTI_TABLE_SINK_REPLICA, - SAVE_MODE_CREATE_TEMPLATE, - NEEDS_UNSUPPORTED_TYPE_CASTING) - .conditional(DATA_SAVE_MODE, DataSaveMode.CUSTOM_PROCESSING, CUSTOM_SQL); } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java index 999f8fbfea..94615fcd85 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java @@ -25,10 +25,10 @@ import lombok.experimental.SuperBuilder; import java.io.Serializable; import java.util.List; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.FENODES; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.PASSWORD; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME; import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_DESERIALIZE_ARROW_ASYNC; import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_DESERIALIZE_QUEUE_SIZE; import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java index 2ee852ffcc..49b9f85ea7 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceOptions.java @@ -22,80 +22,81 @@ import org.apache.seatunnel.api.configuration.Options; import java.util.List; -public interface DorisSourceOptions { +public class DorisSourceOptions extends DorisBaseOptions { - int DORIS_TABLET_SIZE_MIN = 1; - int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE; - int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; - int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; - int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600; - int DORIS_REQUEST_RETRIES_DEFAULT = 3; - Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false; - int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; - long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L; + public static final String DORIS_DEFAULT_CLUSTER = "default_cluster"; + public static final int DORIS_TABLET_SIZE_MIN = 1; + public static final int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE; + public static final int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; + public static final int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; + public static final int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600; + public static final int DORIS_REQUEST_RETRIES_DEFAULT = 3; + public static final Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false; + public static final int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; + public static final long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L; - Option<List<DorisTableConfig>> TABLE_LIST = + public static final Option<List<DorisTableConfig>> TABLE_LIST = Options.key("table_list") .listType(DorisTableConfig.class) .noDefaultValue() .withDescription("table list config."); - Option<String> DORIS_READ_FIELD = + public static final Option<String> DORIS_READ_FIELD = Options.key("doris.read.field") .stringType() .noDefaultValue() .withDescription( "List of column names in the Doris table, separated by commas"); - Option<String> DORIS_FILTER_QUERY = + public static final Option<String> DORIS_FILTER_QUERY = Options.key("doris.filter.query") .stringType() .noDefaultValue() .withDescription( "Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); - Option<Integer> DORIS_TABLET_SIZE = + public static final Option<Integer> DORIS_TABLET_SIZE = Options.key("doris.request.tablet.size") .intType() .defaultValue(DORIS_TABLET_SIZE_DEFAULT) .withDescription(""); - Option<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = + public static final Option<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = Options.key("doris.request.connect.timeout.ms") .intType() .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) .withDescription(""); - Option<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = + public static final Option<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = Options.key("doris.request.read.timeout.ms") .intType() .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) .withDescription(""); - Option<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = + public static final Option<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = Options.key("doris.request.query.timeout.s") .intType() .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) .withDescription(""); - Option<Integer> DORIS_REQUEST_RETRIES = + public static final Option<Integer> DORIS_REQUEST_RETRIES = Options.key("doris.request.retries") .intType() .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) .withDescription(""); - Option<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = + public static final Option<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = Options.key("doris.deserialize.arrow.async") .booleanType() .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) .withDescription(""); - Option<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = + public static final Option<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = Options.key("doris.request.retriesdoris.deserialize.queue.size") .intType() .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) .withDescription(""); - Option<Long> DORIS_EXEC_MEM_LIMIT = + public static final Option<Long> DORIS_EXEC_MEM_LIMIT = Options.key("doris.exec.mem.limit") .longType() .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java index 624d25636b..bfd2fc7873 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java @@ -35,9 +35,9 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DORIS_BATCH_SIZE; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE; import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_EXEC_MEM_LIMIT; import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY; import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java index 67266b453f..a4888f3fdd 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java @@ -31,7 +31,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.Locale; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER; @Slf4j public abstract class AbstractDorisTypeConverter implements TypeConverter<BasicTypeDefine> { diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java index 9b7e98368f..c508b9a14c 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java @@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.LocalTimeType; import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER; /** Doris type converter for version 1.2.x */ @Slf4j diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java index 46ae79251e..4a24e1d5da 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java @@ -36,7 +36,7 @@ import java.util.Locale; import java.util.regex.Matcher; import java.util.regex.Pattern; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER; /** Doris type converter for version 2.x */ @Slf4j diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java index 9a2ce67be2..009d060506 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java @@ -19,6 +19,8 @@ package org.apache.seatunnel.connectors.doris.sink; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.options.SinkConnectorCommonOptions; +import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; @@ -38,10 +40,8 @@ import com.google.auto.service.AutoService; import java.util.Arrays; import java.util.List; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE; +import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE; import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING; @AutoService(Factory.class) @@ -49,12 +49,41 @@ public class DorisSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { - return IDENTIFIER; + return DorisSinkOptions.IDENTIFIER; } @Override public OptionRule optionRule() { - return DorisSinkOptions.SINK_RULE.build(); + return OptionRule.builder() + .required( + DorisSinkOptions.FENODES, + DorisSinkOptions.USERNAME, + DorisSinkOptions.PASSWORD, + DorisSinkOptions.SINK_LABEL_PREFIX, + DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX, + DorisSinkOptions.DATA_SAVE_MODE, + DorisSinkOptions.SCHEMA_SAVE_MODE) + .optional( + DorisSinkOptions.DATABASE, + DorisSinkOptions.TABLE, + DorisSinkOptions.TABLE_IDENTIFIER, + DorisSinkOptions.QUERY_PORT, + DorisSinkOptions.DORIS_BATCH_SIZE, + DorisSinkOptions.SINK_ENABLE_2PC, + DorisSinkOptions.SINK_ENABLE_DELETE, + DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE, + DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING, + DorisSinkOptions.SINK_CHECK_INTERVAL, + DorisSinkOptions.SINK_MAX_RETRIES, + DorisSinkOptions.SINK_BUFFER_SIZE, + DorisSinkOptions.SINK_BUFFER_COUNT, + DorisSinkOptions.DEFAULT_DATABASE, + SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA) + .conditional( + DorisSinkOptions.DATA_SAVE_MODE, + DataSaveMode.CUSTOM_PROCESSING, + DorisSinkOptions.CUSTOM_SQL) + .build(); } @Override @@ -79,7 +108,7 @@ public class DorisSinkFactory implements TableSinkFactory { TableIdentifier tableId = catalogTable.getTableId(); String tableName; String databaseName; - String tableIdentifier = options.get(TABLE_IDENTIFIER); + String tableIdentifier = options.get(DorisSinkOptions.TABLE_IDENTIFIER); if (StringUtils.isNotEmpty(tableIdentifier)) { tableName = tableIdentifier.split("\\.")[1]; databaseName = tableIdentifier.split("\\.")[0]; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java index 05f3e408ed..1bdbd86883 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog; import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory; import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions; import org.apache.seatunnel.connectors.doris.config.DorisTableConfig; import org.apache.commons.lang3.StringUtils; @@ -43,37 +44,36 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME; -import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY; -import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD; -import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.TABLE_LIST; - @Slf4j @AutoService(Factory.class) public class DorisSourceFactory implements TableSourceFactory { @Override public String factoryIdentifier() { - return IDENTIFIER; + return DorisSourceOptions.IDENTIFIER; } @Override public OptionRule optionRule() { return OptionRule.builder() - .required(FENODES, USERNAME, PASSWORD) - .optional(TABLE_LIST) - .optional(DATABASE) - .optional(TABLE) - .optional(DORIS_FILTER_QUERY) - .optional(DORIS_READ_FIELD) - .optional(QUERY_PORT) - .optional(DORIS_BATCH_SIZE) + .required( + DorisSourceOptions.FENODES, + DorisSourceOptions.USERNAME, + DorisSourceOptions.PASSWORD) + .optional(DorisSourceOptions.TABLE_LIST) + .optional(DorisSourceOptions.DATABASE) + .optional(DorisSourceOptions.TABLE) + .optional(DorisSourceOptions.DORIS_FILTER_QUERY) + .optional(DorisSourceOptions.DORIS_TABLET_SIZE) + .optional(DorisSourceOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS) + .optional(DorisSourceOptions.DORIS_REQUEST_READ_TIMEOUT_MS) + .optional(DorisSourceOptions.DORIS_REQUEST_QUERY_TIMEOUT_S) + .optional(DorisSourceOptions.DORIS_REQUEST_RETRIES) + .optional(DorisSourceOptions.DORIS_DESERIALIZE_ARROW_ASYNC) + .optional(DorisSourceOptions.DORIS_DESERIALIZE_QUEUE_SIZE) + .optional(DorisSourceOptions.DORIS_READ_FIELD) + .optional(DorisSourceOptions.QUERY_PORT) + .optional(DorisSourceOptions.DORIS_BATCH_SIZE) + .optional(DorisSourceOptions.DORIS_EXEC_MEM_LIMIT) .build(); } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java index e68366cf26..6ec8d47c5c 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/reader/DorisValueReader.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.doris.backend.BackendClient; import org.apache.seatunnel.connectors.doris.config.DorisSourceConfig; +import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition; @@ -45,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_DEFAULT_CLUSTER; import static org.apache.seatunnel.connectors.doris.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; @Slf4j @@ -115,7 +115,7 @@ public class DorisValueReader { private TScanOpenParams openParams() { TScanOpenParams params = new TScanOpenParams(); - params.setCluster(DORIS_DEFAULT_CLUSTER); + params.setCluster(DorisSourceOptions.DORIS_DEFAULT_CLUSTER); params.setDatabase(partition.getDatabase()); params.setTable(partition.getTable()); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java index 2a55cc350c..50db297dbf 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java @@ -34,7 +34,7 @@ import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog; import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory; -import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions; import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions; import org.apache.seatunnel.connectors.doris.config.DorisSourceOptions; import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory; @@ -99,10 +99,10 @@ public class DorisCatalogIT extends AbstractDorisIT { factory = new DorisCatalogFactory(); Map<String, Object> map = new HashMap<>(); - map.put(DorisOptions.FENODES.key(), frontEndNodes); - map.put(DorisOptions.QUERY_PORT.key(), QUERY_PORT); - map.put(DorisOptions.USERNAME.key(), USERNAME); - map.put(DorisOptions.PASSWORD.key(), PASSWORD); + map.put(DorisBaseOptions.FENODES.key(), frontEndNodes); + map.put(DorisBaseOptions.QUERY_PORT.key(), QUERY_PORT); + map.put(DorisBaseOptions.USERNAME.key(), USERNAME); + map.put(DorisBaseOptions.PASSWORD.key(), PASSWORD); catalog = (DorisCatalog) factory.createCatalog(catalogName, ReadonlyConfig.fromMap(map)); @@ -163,10 +163,10 @@ public class DorisCatalogIT extends AbstractDorisIT { new HashMap<String, Object>() { { put( - DorisOptions.FENODES.key(), + DorisBaseOptions.FENODES.key(), container.getHost() + ":" + HTTP_PORT); - put(DorisOptions.USERNAME.key(), USERNAME); - put(DorisOptions.PASSWORD.key(), PASSWORD); + put(DorisBaseOptions.USERNAME.key(), USERNAME); + put(DorisBaseOptions.PASSWORD.key(), PASSWORD); } }); assertCreateTable(upstreamTable, config, "test.test"); @@ -176,12 +176,12 @@ public class DorisCatalogIT extends AbstractDorisIT { new HashMap<String, Object>() { { put( - DorisOptions.FENODES.key(), + DorisBaseOptions.FENODES.key(), container.getHost() + ":" + HTTP_PORT); - put(DorisOptions.DATABASE.key(), "test2"); - put(DorisOptions.TABLE.key(), "test2"); - put(DorisOptions.USERNAME.key(), USERNAME); - put(DorisOptions.PASSWORD.key(), PASSWORD); + put(DorisBaseOptions.DATABASE.key(), "test2"); + put(DorisBaseOptions.TABLE.key(), "test2"); + put(DorisBaseOptions.USERNAME.key(), USERNAME); + put(DorisBaseOptions.PASSWORD.key(), PASSWORD); } }); assertCreateTable(upstreamTable, config2, "test2.test2"); @@ -191,11 +191,11 @@ public class DorisCatalogIT extends AbstractDorisIT { new HashMap<String, Object>() { { put( - DorisOptions.FENODES.key(), + DorisBaseOptions.FENODES.key(), container.getHost() + ":" + HTTP_PORT); - put(DorisOptions.TABLE_IDENTIFIER.key(), "test3.test3"); - put(DorisOptions.USERNAME.key(), USERNAME); - put(DorisOptions.PASSWORD.key(), PASSWORD); + put(DorisSinkOptions.TABLE_IDENTIFIER.key(), "test3.test3"); + put(DorisBaseOptions.USERNAME.key(), USERNAME); + put(DorisBaseOptions.PASSWORD.key(), PASSWORD); } }); assertCreateTable(upstreamTable, config3, "test3.test3"); @@ -205,12 +205,12 @@ public class DorisCatalogIT extends AbstractDorisIT { new HashMap<String, Object>() { { put( - DorisOptions.FENODES.key(), + DorisBaseOptions.FENODES.key(), container.getHost() + ":" + HTTP_PORT); - put(DorisOptions.DATABASE.key(), "test5"); - put(DorisOptions.TABLE.key(), "${table_name}"); - put(DorisOptions.USERNAME.key(), USERNAME); - put(DorisOptions.PASSWORD.key(), PASSWORD); + put(DorisBaseOptions.DATABASE.key(), "test5"); + put(DorisBaseOptions.TABLE.key(), "${table_name}"); + put(DorisBaseOptions.USERNAME.key(), USERNAME); + put(DorisBaseOptions.PASSWORD.key(), PASSWORD); } }); assertCreateTable(upstreamTable, config4, "test5.test"); @@ -220,12 +220,12 @@ public class DorisCatalogIT extends AbstractDorisIT { new HashMap<String, Object>() { { put( - DorisOptions.FENODES.key(), + DorisBaseOptions.FENODES.key(), container.getHost() + ":" + HTTP_PORT); - put(DorisOptions.DATABASE.key(), "test4"); - put(DorisOptions.TABLE.key(), "test4"); - put(DorisOptions.USERNAME.key(), USERNAME); - put(DorisOptions.PASSWORD.key(), PASSWORD); + put(DorisBaseOptions.DATABASE.key(), "test4"); + put(DorisBaseOptions.TABLE.key(), "test4"); + put(DorisBaseOptions.USERNAME.key(), USERNAME); + put(DorisBaseOptions.PASSWORD.key(), PASSWORD); put(DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING.key(), true); } }); @@ -295,21 +295,29 @@ public class DorisCatalogIT extends AbstractDorisIT { ReadonlyConfig.fromMap( new HashMap<String, Object>() { { - put(DorisOptions.DATABASE.key(), DATABASE); - put(DorisOptions.TABLE.key(), SINK_TABLE); - put(DorisOptions.USERNAME.key(), USERNAME); - put(DorisOptions.PASSWORD.key(), PASSWORD); + put( + DorisBaseOptions.DATABASE.key(), + DATABASE); + put( + DorisBaseOptions.TABLE.key(), + SINK_TABLE); + put( + DorisBaseOptions.USERNAME.key(), + USERNAME); + put( + DorisBaseOptions.PASSWORD.key(), + PASSWORD); put( DorisSourceOptions.DORIS_READ_FIELD .key(), "k1,k2"); put( - DorisOptions.FENODES.key(), + DorisBaseOptions.FENODES.key(), container.getHost() + ":" + HTTP_PORT); put( - DorisOptions.QUERY_PORT.key(), + DorisBaseOptions.QUERY_PORT.key(), QUERY_PORT); } }),