This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new b2375fffe8 [Feature] Support SaveMode on Doris (#6085) b2375fffe8 is described below commit b2375fffe803083ea89c201e38c8396e2def7cbe Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Thu Jan 4 14:52:10 2024 +0800 [Feature] Support SaveMode on Doris (#6085) --- docs/en/connector-v2/sink/Doris.md | 112 +++++-- .../seatunnel/api/sink/DefaultSaveModeHandler.java | 3 + .../apache/seatunnel/api/table/catalog/Column.java | 3 + .../api/table/catalog/MetadataColumn.java | 12 + .../api/table/catalog/PhysicalColumn.java | 17 + .../connectors/doris/catalog/DorisCatalog.java | 25 ++ .../doris/catalog/DorisCatalogFactory.java | 4 + .../connectors/doris/config/DorisConfig.java | 6 +- .../connectors/doris/config/DorisOptions.java | 94 ++++-- .../connectors/doris/rest/PartitionDefinition.java | 147 -------- .../connectors/doris/rest/RestService.java | 373 +-------------------- .../connectors/doris/rest/models/Backend.java | 40 --- .../connectors/doris/rest/models/BackendRow.java | 41 --- .../connectors/doris/rest/models/Field.java | 134 -------- .../connectors/doris/rest/models/QueryPlan.java | 70 ---- .../connectors/doris/rest/models/Schema.java | 108 ------ .../connectors/doris/rest/models/Tablet.java | 80 ----- .../seatunnel/connectors/doris/sink/DorisSink.java | 47 ++- .../connectors/doris/sink/DorisSinkFactory.java | 59 +++- .../doris/sink/writer/DorisSinkState.java | 4 +- .../doris/sink/writer/DorisSinkWriter.java | 33 +- .../doris/sink/writer/DorisStreamLoad.java | 7 +- .../connectors/doris/util/DorisCatalogUtil.java | 80 +++-- .../seatunnel/connectors/doris/util/IOUtils.java | 49 --- .../doris/util/UnsupportedTypeConverterUtils.java | 101 ++++++ .../doris/catalog/DorisCreateTableTest.java | 280 ++++++++++++++++ .../e2e/connector/doris/DorisCatalogIT.java | 152 +++++++-- .../resources/write-cdc-changelog-to-doris.conf | 3 +- .../src/test/resources/doris-jdbc-to-doris.conf | 3 +- 29 files changed, 935 insertions(+), 1152 deletions(-) diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index de0c47453a..a485eaf8c7 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -32,21 +32,91 @@ Version Supported ## Sink Options -| Name | Type | Required | Default | Description | -|---------------------|--------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` | -| username | String | Yes | - | `Doris` user username | -| password | String | Yes | - | `Doris` user password | -| table.identifier | String | Yes | - | The name of `Doris` table | -| sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. | -| sink.enable-2pc | bool | No | - | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD). | -| sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual) | -| sink.check-interval | int | No | 10000 | check exception with the interval while loading | -| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed | -| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. | -| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. | -| doris.batch.size | int | No | 1024 | the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server. | -| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. | +| Name | Type | Required | Default | Description | +|--------------------------------|---------|----------|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` | +| query-port | int | No | 9030 | `Doris` Fenodes query_port | +| username | String | Yes | - | `Doris` user username | +| password | String | Yes | - | `Doris` user password | +| database | String | Yes | - | The database name of `Doris` table, use `${database_name}` to represent the upstream table name | +| table | String | Yes | - | The table name of `Doris` table, use `${table_name}` to represent the upstream table name | +| table.identifier | String | Yes | - | The name of `Doris` table, it will deprecate after version 2.3.5, please use `database` and `table` instead. | +| sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. | +| sink.enable-2pc | bool | No | - | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD). | +| sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual) | +| sink.check-interval | int | No | 10000 | check exception with the interval while loading | +| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed | +| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. | +| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. | +| doris.batch.size | int | No | 1024 | the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server. | +| needs_unsupported_type_casting | boolean | No | false | Whether to enable the unsupported type casting, such as Decimal64 to Double | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below | +| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below | +| save_mode_create_template | string | no | see below | see below | +| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. | +| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. | + +### schema_save_mode[Enum] + +Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. +Option introduction: +`RECREATE_SCHEMA` :Will create when the table does not exist, delete and rebuild when the table is saved +`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, skipped when the table is saved +`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not exist + +### data_save_mode[Enum] + +Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. +Option introduction: +`DROP_DATA`: Preserve database structure and delete data +`APPEND_DATA`:Preserve database structure, preserve data +`CUSTOM_PROCESSING`:User defined processing +`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported + +### save_mode_create_template + +We use templates to automatically create Doris tables, +which will create corresponding table creation statements based on the type of upstream data and schema type, +and the default template can be modified according to the situation. + +```sql +CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` +( + ${rowtype_fields} +) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key}) + DISTRIBUTED BY HASH (${rowtype_primary_key}) + PROPERTIES +( + "replication_num" = "1" +); +``` + +If a custom field is filled in the template, such as adding an `id` field + +```sql +CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` +( + id, + ${rowtype_fields} +) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key}) + DISTRIBUTED BY HASH (${rowtype_primary_key}) + PROPERTIES +( + "replication_num" = "1" +); +``` + +The connector will automatically obtain the corresponding type from the upstream to complete the filling, +and remove the id field from `rowtype_fields`. This method can be used to customize the modification of field types and attributes. + +You can use the following placeholders + +- database: Used to get the database in the upstream schema +- table_name: Used to get the table name in the upstream schema +- rowtype_fields: Used to get all the fields in the upstream schema, we will automatically map to the field + description of Doris +- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list) +- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list) ## Data Type Mapping @@ -125,7 +195,8 @@ sink { fenodes = "doris_cdc_e2e:8030" username = root password = "" - table.identifier = "test.e2e_table_sink" + database = "test" + table = "e2e_table_sink" sink.label-prefix = "test-cdc" sink.enable-2pc = "true" sink.enable-delete = "true" @@ -197,7 +268,8 @@ sink { fenodes = "doris_cdc_e2e:8030" username = root password = "" - table.identifier = "test.e2e_table_sink" + database = "test" + table = "e2e_table_sink" sink.label-prefix = "test-cdc" sink.enable-2pc = "true" sink.enable-delete = "true" @@ -218,7 +290,8 @@ sink { fenodes = "e2e_dorisdb:8030" username = root password = "" - table.identifier = "test.e2e_table_sink" + database = "test" + table = "e2e_table_sink" sink.enable-2pc = "true" sink.label-prefix = "test_json" doris.config = { @@ -238,7 +311,8 @@ sink { fenodes = "e2e_dorisdb:8030" username = root password = "" - table.identifier = "test.e2e_table_sink" + database = "test" + table = "e2e_table_sink" sink.enable-2pc = "true" sink.label-prefix = "test_csv" doris.config = { diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java index e9ce7b261c..9566658979 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DefaultSaveModeHandler.java @@ -136,6 +136,9 @@ public class DefaultSaveModeHandler implements SaveModeHandler { } protected void createTable() { + if (!catalog.databaseExists(tablePath.getDatabaseName())) { + catalog.createDatabase(TablePath.of(tablePath.getDatabaseName(), ""), true); + } catalog.createTable(tablePath, catalogTable, true); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java index bec10b3d75..109d025695 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Column.java @@ -133,4 +133,7 @@ public abstract class Column implements Serializable { /** Returns a copy of the column. */ public abstract Column copy(); + + /** Returns a copy of the column with a replaced name. */ + public abstract Column rename(String newColumnName); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java index 5325dac791..9294c13506 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/MetadataColumn.java @@ -70,4 +70,16 @@ public class MetadataColumn extends Column { return MetadataColumn.of( name, dataType, columnLength, metadataKey, nullable, defaultValue, comment); } + + @Override + public Column rename(String newColumnName) { + return MetadataColumn.of( + newColumnName, + dataType, + columnLength, + metadataKey, + nullable, + defaultValue, + comment); + } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java index 164752d468..926dcc2eba 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/PhysicalColumn.java @@ -144,4 +144,21 @@ public class PhysicalColumn extends Column { options, longColumnLength); } + + @Override + public Column rename(String newColumnName) { + return PhysicalColumn.of( + newColumnName, + dataType, + columnLength, + nullable, + defaultValue, + comment, + sourceType, + isUnsigned, + isZeroFill, + bitLen, + options, + longColumnLength); + } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java index 071bf1f608..0e5faef550 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java @@ -334,4 +334,29 @@ public class DorisCatalog implements Catalog { options.put(DorisOptions.PASSWORD.key(), password); return options; } + + public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + if (ignoreIfNotExists) { + conn.createStatement() + .execute(String.format("TRUNCATE TABLE %s", tablePath.getFullName())); + } + } catch (Exception e) { + throw new CatalogException( + String.format("Failed TRUNCATE TABLE in catalog %s", tablePath.getFullName()), + e); + } + } + + public boolean isExistsData(TablePath tablePath) { + String tableName = tablePath.getFullName(); + String sql = String.format("select * from %s limit 1;", tableName); + try (PreparedStatement ps = conn.prepareStatement(sql)) { + ResultSet resultSet = ps.executeQuery(); + return resultSet.next(); + } catch (SQLException e) { + throw new CatalogException(String.format("Failed executeSql error %s", sql), e); + } + } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java index 3df6bcca24..16db24f48a 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java @@ -21,9 +21,13 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.connectors.doris.config.DorisConfig; import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) public class DorisCatalogFactory implements CatalogFactory { @Override diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java index 98a43c7b8e..ea93b8f551 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java @@ -41,6 +41,7 @@ import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_RE import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_SINK_CONFIG_PREFIX; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_TABLET_SIZE; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SAVE_MODE_CREATE_TEMPLATE; @@ -51,7 +52,6 @@ import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_ENA import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_ENABLE_DELETE; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_LABEL_PREFIX; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_MAX_RETRIES; -import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER; import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME; @Setter @@ -64,7 +64,6 @@ public class DorisConfig implements Serializable { private String username; private String password; private Integer queryPort; - private String tableIdentifier; private int batchSize; // source option @@ -89,6 +88,7 @@ public class DorisConfig implements Serializable { private Integer bufferSize; private Integer bufferCount; private Properties streamLoadProps; + private boolean needsUnsupportedTypeCasting; // create table option private String createTableTemplate; @@ -105,7 +105,6 @@ public class DorisConfig implements Serializable { dorisConfig.setFrontends(config.get(FENODES)); dorisConfig.setUsername(config.get(USERNAME)); dorisConfig.setPassword(config.get(PASSWORD)); - dorisConfig.setTableIdentifier(config.get(TABLE_IDENTIFIER)); dorisConfig.setQueryPort(config.get(QUERY_PORT)); dorisConfig.setStreamLoadProps(parseStreamLoadProperties(config)); @@ -129,6 +128,7 @@ public class DorisConfig implements Serializable { dorisConfig.setBufferSize(config.get(SINK_BUFFER_SIZE)); dorisConfig.setBufferCount(config.get(SINK_BUFFER_COUNT)); dorisConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE)); + dorisConfig.setNeedsUnsupportedTypeCasting(config.get(NEEDS_UNSUPPORTED_TYPE_CASTING)); // create table option dorisConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE)); diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java index 04a4f02851..7dd4612c48 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java @@ -17,17 +17,18 @@ package org.apache.seatunnel.connectors.doris.config; -import org.apache.seatunnel.shade.com.google.common.collect.ImmutableMap; - import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SchemaSaveMode; import java.util.Map; +import static org.apache.seatunnel.api.sink.SinkCommonOptions.MULTI_TABLE_SINK_REPLICA; + public interface DorisOptions { - int DORIS_TABLET_SIZE_MIN = 1; int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE; int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000; int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000; @@ -36,32 +37,11 @@ public interface DorisOptions { Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false; int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64; int DORIS_BATCH_SIZE_DEFAULT = 1024; - long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L; int DEFAULT_SINK_CHECK_INTERVAL = 10000; int DEFAULT_SINK_MAX_RETRIES = 3; int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024; int DEFAULT_SINK_BUFFER_COUNT = 3; - Map<String, String> DEFAULT_CREATE_PROPERTIES = - ImmutableMap.of( - "replication_allocation", "tag.location.default: 3", - "storage_format", "V2", - "disable_auto_compaction", "false"); - - String DEFAULT_CREATE_TEMPLATE = - "CREATE TABLE ${table_identifier}\n" - + "(\n" - + "${column_definition}\n" - + ")\n" - + "ENGINE = ${engine_type}\n" - + "UNIQUE KEY (${key_columns})\n" - + "COMMENT ${table_comment}\n" - + "${partition_info}\n" - + "DISTRIBUTED BY HASH (${distribution_columns}) BUCKETS ${distribution_bucket}\n" - + "PROPERTIES (\n" - + "${properties}\n" - + ")\n"; - // common option Option<String> FENODES = Options.key("fenodes") @@ -75,11 +55,13 @@ public interface DorisOptions { .defaultValue(9030) .withDescription("doris query port"); + @Deprecated Option<String> TABLE_IDENTIFIER = Options.key("table.identifier") .stringType() .noDefaultValue() .withDescription("the doris table name."); + Option<String> USERNAME = Options.key("username") .stringType() @@ -90,6 +72,17 @@ public interface DorisOptions { .stringType() .noDefaultValue() .withDescription("the doris password."); + + Option<String> TABLE = + Options.key("table") + .stringType() + .noDefaultValue() + .withDescription("the doris table name."); + Option<String> DATABASE = + Options.key("database") + .stringType() + .noDefaultValue() + .withDescription("the doris database name."); Option<Integer> DORIS_BATCH_SIZE = Options.key("doris.batch.size") .intType() @@ -197,6 +190,28 @@ public interface DorisOptions { .defaultValue("information_schema") .withDescription(""); + Option<SchemaSaveMode> SCHEMA_SAVE_MODE = + Options.key("schema_save_mode") + .enumType(SchemaSaveMode.class) + .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST) + .withDescription("schema_save_mode"); + + Option<DataSaveMode> DATA_SAVE_MODE = + Options.key("data_save_mode") + .enumType(DataSaveMode.class) + .defaultValue(DataSaveMode.APPEND_DATA) + .withDescription("data_save_mode"); + + Option<String> CUSTOM_SQL = + Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql"); + + Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING = + Options.key("needs_unsupported_type_casting") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable the unsupported type casting, such as Decimal64 to Double"); + // create table Option<String> SAVE_MODE_CREATE_TEMPLATE = Options.key("save_mode_create_template") @@ -205,17 +220,38 @@ public interface DorisOptions { "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n" + "${rowtype_fields}\n" + ") ENGINE=OLAP\n" - + "UNIQUE KEY (${rowtype_primary_key})\n" - + "DISTRIBUTED BY HASH (${rowtype_primary_key})\n" + + " UNIQUE KEY (${rowtype_primary_key})\n" + + "DISTRIBUTED BY HASH (${rowtype_primary_key})\n " + "PROPERTIES (\n" - + " \"replication_num\" = \"1\" \n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + ")") .withDescription("Create table statement template, used to create Doris table"); OptionRule.Builder SINK_RULE = OptionRule.builder() - .required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER) - .optional(DORIS_BATCH_SIZE); + .required( + FENODES, + USERNAME, + PASSWORD, + SINK_LABEL_PREFIX, + DORIS_SINK_CONFIG_PREFIX, + DATA_SAVE_MODE, + SCHEMA_SAVE_MODE) + .optional( + DATABASE, + TABLE, + TABLE_IDENTIFIER, + QUERY_PORT, + DORIS_BATCH_SIZE, + SINK_ENABLE_2PC, + SINK_ENABLE_DELETE, + MULTI_TABLE_SINK_REPLICA, + SAVE_MODE_CREATE_TEMPLATE, + NEEDS_UNSUPPORTED_TYPE_CASTING) + .conditional(DATA_SAVE_MODE, DataSaveMode.CUSTOM_PROCESSING, CUSTOM_SQL); OptionRule.Builder CATALOG_RULE = OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, PASSWORD); diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java deleted file mode 100644 index 884cd7bde4..0000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java +++ /dev/null @@ -1,147 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.seatunnel.connectors.doris.rest; - -import java.io.Serializable; -import java.util.Collections; -import java.util.HashSet; -import java.util.Objects; -import java.util.Set; - -/** Doris partition info. */ -public class PartitionDefinition implements Serializable, Comparable<PartitionDefinition> { - private final String database; - private final String table; - - private final String beAddress; - private final Set<Long> tabletIds; - private final String queryPlan; - - public PartitionDefinition( - String database, String table, String beAddress, Set<Long> tabletIds, String queryPlan) - throws IllegalArgumentException { - this.database = database; - this.table = table; - this.beAddress = beAddress; - this.tabletIds = tabletIds; - this.queryPlan = queryPlan; - } - - public String getBeAddress() { - return beAddress; - } - - public Set<Long> getTabletIds() { - return tabletIds; - } - - public String getDatabase() { - return database; - } - - public String getTable() { - return table; - } - - public String getQueryPlan() { - return queryPlan; - } - - @Override - public int compareTo(PartitionDefinition o) { - int cmp = database.compareTo(o.database); - if (cmp != 0) { - return cmp; - } - cmp = table.compareTo(o.table); - if (cmp != 0) { - return cmp; - } - cmp = beAddress.compareTo(o.beAddress); - if (cmp != 0) { - return cmp; - } - cmp = queryPlan.compareTo(o.queryPlan); - if (cmp != 0) { - return cmp; - } - - cmp = tabletIds.size() - o.tabletIds.size(); - if (cmp != 0) { - return cmp; - } - - Set<Long> similar = new HashSet<>(tabletIds); - Set<Long> diffSelf = new HashSet<>(tabletIds); - Set<Long> diffOther = new HashSet<>(o.tabletIds); - similar.retainAll(o.tabletIds); - diffSelf.removeAll(similar); - diffOther.removeAll(similar); - if (diffSelf.size() == 0) { - return 0; - } - long diff = Collections.min(diffSelf) - Collections.min(diffOther); - return diff < 0 ? -1 : 1; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - PartitionDefinition that = (PartitionDefinition) o; - return Objects.equals(database, that.database) - && Objects.equals(table, that.table) - && Objects.equals(beAddress, that.beAddress) - && Objects.equals(tabletIds, that.tabletIds) - && Objects.equals(queryPlan, that.queryPlan); - } - - @Override - public int hashCode() { - int result = database.hashCode(); - result = 31 * result + table.hashCode(); - result = 31 * result + beAddress.hashCode(); - result = 31 * result + queryPlan.hashCode(); - result = 31 * result + tabletIds.hashCode(); - return result; - } - - @Override - public String toString() { - return "PartitionDefinition{" - + ", database='" - + database - + '\'' - + ", table='" - + table - + '\'' - + ", beAddress='" - + beAddress - + '\'' - + ", tabletIds=" - + tabletIds - + ", queryPlan='" - + queryPlan - + '\'' - + '}'; - } -} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java index e1c83a0700..e8b66d4497 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java @@ -21,12 +21,7 @@ import org.apache.seatunnel.connectors.doris.config.DorisConfig; import org.apache.seatunnel.connectors.doris.config.DorisOptions; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; -import org.apache.seatunnel.connectors.doris.rest.models.Backend; -import org.apache.seatunnel.connectors.doris.rest.models.BackendRow; import org.apache.seatunnel.connectors.doris.rest.models.BackendV2; -import org.apache.seatunnel.connectors.doris.rest.models.QueryPlan; -import org.apache.seatunnel.connectors.doris.rest.models.Schema; -import org.apache.seatunnel.connectors.doris.rest.models.Tablet; import org.apache.seatunnel.connectors.doris.util.ErrorMessages; import org.apache.commons.io.IOUtils; @@ -36,7 +31,6 @@ import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; -import org.apache.http.entity.StringEntity; import org.slf4j.Logger; @@ -55,30 +49,16 @@ import java.io.Serializable; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; @Slf4j public class RestService implements Serializable { - public static final int REST_RESPONSE_STATUS_OK = 200; - public static final int REST_RESPONSE_CODE_OK = 0; - private static final String REST_RESPONSE_BE_ROWS_KEY = "rows"; - private static final String API_PREFIX = "/api"; - private static final String SCHEMA = "_schema"; - private static final String QUERY_PLAN = "_query_plan"; - private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS"; - @Deprecated private static final String BACKENDS = "/rest/v1/system?path=//backends"; private static final String BACKENDS_V2 = "/api/backends?is_alive=true"; - private static final String FE_LOGIN = "/rest/v1/login"; - private static final String BASE_URL = "http://%s%s"; private static String send(DorisConfig dorisConfig, HttpRequestBase request, Logger logger) throws DorisConnectorException { @@ -133,7 +113,7 @@ public class RestService implements Serializable { dorisConfig.getPassword(), logger); } - if (response == null) { + if (StringUtils.isEmpty(response)) { logger.warn( "Failed to get response from Doris FE {}, http code is {}", request.getURI(), @@ -233,43 +213,6 @@ public class RestService implements Serializable { return result.toString(); } - @VisibleForTesting - static String[] parseIdentifier(String tableIdentifier, Logger logger) - throws DorisConnectorException { - logger.trace("Parse identifier '{}'.", tableIdentifier); - if (StringUtils.isEmpty(tableIdentifier)) { - String errMsg = - String.format( - ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, - "table.identifier", - tableIdentifier); - throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg); - } - String[] identifier = tableIdentifier.split("\\."); - if (identifier.length != 2) { - String errMsg = - String.format( - ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, - "table.identifier", - tableIdentifier); - throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg); - } - return identifier; - } - - @VisibleForTesting - static String randomEndpoint(String feNodes, Logger logger) throws DorisConnectorException { - logger.trace("Parse fenodes '{}'.", feNodes); - if (StringUtils.isEmpty(feNodes)) { - String errMsg = - String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes); - throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg); - } - List<String> nodes = Arrays.asList(feNodes.split(",")); - Collections.shuffle(nodes); - return nodes.get(0).trim(); - } - @VisibleForTesting static List<String> allEndpoints(String feNodes, Logger logger) throws DorisConnectorException { logger.trace("Parse fenodes '{}'.", feNodes); @@ -286,7 +229,7 @@ public class RestService implements Serializable { @VisibleForTesting public static String randomBackend(DorisConfig dorisConfig, Logger logger) - throws DorisConnectorException, IOException { + throws DorisConnectorException { List<BackendV2.BackendRowV2> backends = getBackendsV2(dorisConfig, logger); logger.trace("Parse beNodes '{}'.", backends); if (backends == null || backends.isEmpty()) { @@ -311,59 +254,9 @@ public class RestService implements Serializable { } } - @Deprecated - @VisibleForTesting - static List<BackendRow> getBackends(DorisConfig dorisConfig, Logger logger) - throws DorisConnectorException, IOException { - String feNodes = dorisConfig.getFrontends(); - String feNode = randomEndpoint(feNodes, logger); - String beUrl = String.format(BASE_URL, feNode, BACKENDS); - HttpGet httpGet = new HttpGet(beUrl); - String response = send(dorisConfig, httpGet, logger); - logger.info("Backend Info:{}", response); - List<BackendRow> backends = parseBackend(response, logger); - return backends; - } - - @Deprecated - static List<BackendRow> parseBackend(String response, Logger logger) - throws DorisConnectorException, IOException { - ObjectMapper mapper = new ObjectMapper(); - Backend backend; - try { - backend = mapper.readValue(response, Backend.class); - } catch (JsonParseException e) { - String errMsg = "Doris BE's response is not a json. res: " + response; - logger.error(errMsg, e); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e); - } catch (JsonMappingException e) { - String errMsg = "Doris BE's response cannot map to schema. res: " + response; - logger.error(errMsg, e); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e); - } catch (IOException e) { - String errMsg = "Parse Doris BE's response to json failed. res: " + response; - logger.error(errMsg, e); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e); - } - - if (backend == null) { - logger.error(ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, - ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE); - } - List<BackendRow> backendRows = - backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList()); - logger.debug("Parsing schema result is '{}'.", backendRows); - return backendRows; - } - @VisibleForTesting public static List<BackendV2.BackendRowV2> getBackendsV2(DorisConfig dorisConfig, Logger logger) - throws DorisConnectorException, IOException { + throws DorisConnectorException { String feNodes = dorisConfig.getFrontends(); List<String> feNodeList = allEndpoints(feNodes, logger); for (String feNode : feNodeList) { @@ -385,7 +278,7 @@ public class RestService implements Serializable { } static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) - throws DorisConnectorException, IOException { + throws DorisConnectorException { ObjectMapper mapper = new ObjectMapper(); BackendV2 backend; try { @@ -416,262 +309,4 @@ public class RestService implements Serializable { logger.debug("Parsing schema result is '{}'.", backendRows); return backendRows; } - - @VisibleForTesting - static String getUriStr(DorisConfig dorisConfig, Logger logger) throws DorisConnectorException { - String[] identifier = parseIdentifier(dorisConfig.getTableIdentifier(), logger); - return "http://" - + randomEndpoint(dorisConfig.getFrontends(), logger) - + API_PREFIX - + "/" - + identifier[0] - + "/" - + identifier[1] - + "/"; - } - - public static Schema getSchema(DorisConfig dorisConfig, Logger logger) - throws DorisConnectorException { - logger.trace("Finding schema."); - HttpGet httpGet = new HttpGet(getUriStr(dorisConfig, logger) + SCHEMA); - String response = send(dorisConfig, httpGet, logger); - logger.debug("Find schema response is '{}'.", response); - return parseSchema(response, logger); - } - - public static boolean isUniqueKeyType(DorisConfig dorisConfig, Logger logger) - throws DorisConnectorException { - try { - return UNIQUE_KEYS_TYPE.equals(getSchema(dorisConfig, logger).getKeysType()); - } catch (Exception e) { - throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, e); - } - } - - @VisibleForTesting - public static Schema parseSchema(String response, Logger logger) - throws DorisConnectorException { - logger.trace("Parse response '{}' to schema.", response); - ObjectMapper mapper = new ObjectMapper(); - Schema schema; - try { - schema = mapper.readValue(response, Schema.class); - } catch (JsonParseException e) { - String errMsg = "Doris FE's response is not a json. res: " + response; - logger.error(errMsg, e); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e); - } catch (JsonMappingException e) { - String errMsg = "Doris FE's response cannot map to schema. res: " + response; - logger.error(errMsg, e); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e); - } catch (IOException e) { - String errMsg = "Parse Doris FE's response to json failed. res: " + response; - logger.error(errMsg, e); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e); - } - - if (schema == null) { - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, - ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE); - } - - if (schema.getStatus() != REST_RESPONSE_STATUS_OK) { - String errMsg = "Doris FE's response is not OK, status is " + schema.getStatus(); - logger.error(errMsg); - throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg); - } - logger.debug("Parsing schema result is '{}'.", schema); - return schema; - } - - public static List<PartitionDefinition> findPartitions(DorisConfig dorisConfig, Logger logger) - throws DorisConnectorException { - String[] tableIdentifiers = parseIdentifier(dorisConfig.getTableIdentifier(), logger); - String readFields = - StringUtils.isBlank(dorisConfig.getReadField()) ? "*" : dorisConfig.getReadField(); - String sql = - "select " - + readFields - + " from `" - + tableIdentifiers[0] - + "`.`" - + tableIdentifiers[1] - + "`"; - if (!StringUtils.isEmpty(dorisConfig.getFilterQuery())) { - sql += " where " + dorisConfig.getFilterQuery(); - } - logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql); - - HttpPost httpPost = new HttpPost(getUriStr(dorisConfig, logger) + QUERY_PLAN); - String entity = "{\"sql\": \"" + sql + "\"}"; - logger.debug("Post body Sending to Doris FE is: '{}'.", entity); - StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8); - stringEntity.setContentEncoding("UTF-8"); - stringEntity.setContentType("application/json"); - httpPost.setEntity(stringEntity); - - String resStr = send(dorisConfig, httpPost, logger); - logger.debug("Find partition response is '{}'.", resStr); - QueryPlan queryPlan = getQueryPlan(resStr, logger); - Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger); - return tabletsMapToPartition( - dorisConfig, - be2Tablets, - queryPlan.getOpaquedQueryPlan(), - tableIdentifiers[0], - tableIdentifiers[1], - logger); - } - - @VisibleForTesting - static QueryPlan getQueryPlan(String response, Logger logger) throws DorisConnectorException { - ObjectMapper mapper = new ObjectMapper(); - QueryPlan queryPlan; - try { - queryPlan = mapper.readValue(response, QueryPlan.class); - } catch (JsonParseException e) { - String errMsg = "Doris FE's response is not a json. res: " + response; - logger.error(errMsg, e); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e); - } catch (JsonMappingException e) { - String errMsg = "Doris FE's response cannot map to schema. res: " + response; - logger.error(errMsg, e); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e); - } catch (IOException e) { - String errMsg = "Parse Doris FE's response to json failed. res: " + response; - logger.error(errMsg, e); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e); - } - - if (queryPlan == null) { - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, - ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE); - } - - if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) { - String errMsg = "Doris FE's response is not OK, status is " + queryPlan.getStatus(); - logger.error(errMsg); - throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg); - } - logger.debug("Parsing partition result is '{}'.", queryPlan); - return queryPlan; - } - - @VisibleForTesting - static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) - throws DorisConnectorException { - Map<String, List<Long>> be2Tablets = new HashMap<>(); - for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) { - logger.debug("Parse tablet info: '{}'.", part); - long tabletId; - try { - tabletId = Long.parseLong(part.getKey()); - } catch (NumberFormatException e) { - String errMsg = "Parse tablet id '" + part.getKey() + "' to long failed."; - logger.error(errMsg, e); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e); - } - String target = null; - int tabletCount = Integer.MAX_VALUE; - for (String candidate : part.getValue().getRoutings()) { - logger.trace("Evaluate Doris BE '{}' to tablet '{}'.", candidate, tabletId); - if (!be2Tablets.containsKey(candidate)) { - logger.debug( - "Choice a new Doris BE '{}' for tablet '{}'.", candidate, tabletId); - List<Long> tablets = new ArrayList<>(); - be2Tablets.put(candidate, tablets); - target = candidate; - break; - } else { - if (be2Tablets.get(candidate).size() < tabletCount) { - target = candidate; - tabletCount = be2Tablets.get(candidate).size(); - logger.debug( - "Current candidate Doris BE to tablet '{}' is '{}' with tablet count {}.", - tabletId, - target, - tabletCount); - } - } - } - if (target == null) { - String errMsg = "Cannot choice Doris BE for tablet " + tabletId; - logger.error(errMsg); - throw new DorisConnectorException( - DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg); - } - - logger.debug("Choice Doris BE '{}' for tablet '{}'.", target, tabletId); - be2Tablets.get(target).add(tabletId); - } - return be2Tablets; - } - - @VisibleForTesting - static int tabletCountLimitForOnePartition(DorisConfig dorisConfig, Logger logger) { - int tabletsSize = DorisOptions.DORIS_TABLET_SIZE_DEFAULT; - if (dorisConfig.getTabletSize() != null) { - tabletsSize = dorisConfig.getTabletSize(); - } - if (tabletsSize < DorisOptions.DORIS_TABLET_SIZE_MIN) { - logger.warn( - "{} is less than {}, set to default value {}.", - DorisOptions.DORIS_TABLET_SIZE, - DorisOptions.DORIS_TABLET_SIZE_MIN, - DorisOptions.DORIS_TABLET_SIZE_MIN); - tabletsSize = DorisOptions.DORIS_TABLET_SIZE_MIN; - } - logger.debug("Tablet size is set to {}.", tabletsSize); - return tabletsSize; - } - - @VisibleForTesting - static List<PartitionDefinition> tabletsMapToPartition( - DorisConfig dorisConfig, - Map<String, List<Long>> be2Tablets, - String opaquedQueryPlan, - String database, - String table, - Logger logger) - throws DorisConnectorException { - int tabletsSize = tabletCountLimitForOnePartition(dorisConfig, logger); - List<PartitionDefinition> partitions = new ArrayList<>(); - for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) { - logger.debug("Generate partition with beInfo: '{}'.", beInfo); - HashSet<Long> tabletSet = new HashSet<>(beInfo.getValue()); - beInfo.getValue().clear(); - beInfo.getValue().addAll(tabletSet); - int first = 0; - while (first < beInfo.getValue().size()) { - Set<Long> partitionTablets = - new HashSet<>( - beInfo.getValue() - .subList( - first, - Math.min( - beInfo.getValue().size(), - first + tabletsSize))); - first = first + tabletsSize; - PartitionDefinition partitionDefinition = - new PartitionDefinition( - database, - table, - beInfo.getKey(), - partitionTablets, - opaquedQueryPlan); - logger.debug("Generate one PartitionDefinition '{}'.", partitionDefinition); - partitions.add(partitionDefinition); - } - } - return partitions; - } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java deleted file mode 100644 index f151a0e727..0000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.rest.models; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.List; - -/** Be response model */ -@Deprecated -@JsonIgnoreProperties(ignoreUnknown = true) -public class Backend { - - @JsonProperty(value = "rows") - private List<BackendRow> rows; - - public List<BackendRow> getRows() { - return rows; - } - - public void setRows(List<BackendRow> rows) { - this.rows = rows; - } -} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java deleted file mode 100644 index fe2260bea7..0000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.rest.models; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; - -@Getter -@Setter -@ToString -@Deprecated -@JsonIgnoreProperties(ignoreUnknown = true) -public class BackendRow { - - @JsonProperty(value = "HttpPort") - private String httpPort; - - @JsonProperty(value = "IP") - private String ip; - - @JsonProperty(value = "Alive") - private Boolean alive; -} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java deleted file mode 100644 index 8c9d00d01a..0000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.rest.models; - -import java.util.Objects; - -public class Field { - private String name; - private String type; - private String comment; - private int precision; - private int scale; - private String aggregationType; - - public Field() {} - - public Field( - String name, - String type, - String comment, - int precision, - int scale, - String aggregationType) { - this.name = name; - this.type = type; - this.comment = comment; - this.precision = precision; - this.scale = scale; - this.aggregationType = aggregationType; - } - - public String getAggregationType() { - return aggregationType; - } - - public void setAggregationType(String aggregationType) { - this.aggregationType = aggregationType; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getComment() { - return comment; - } - - public void setComment(String comment) { - this.comment = comment; - } - - public int getPrecision() { - return precision; - } - - public void setPrecision(int precision) { - this.precision = precision; - } - - public int getScale() { - return scale; - } - - public void setScale(int scale) { - this.scale = scale; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Field field = (Field) o; - return precision == field.precision - && scale == field.scale - && Objects.equals(name, field.name) - && Objects.equals(type, field.type) - && Objects.equals(comment, field.comment); - } - - @Override - public int hashCode() { - return Objects.hash(name, type, comment, precision, scale); - } - - @Override - public String toString() { - return "Field{" - + "name='" - + name - + '\'' - + ", type='" - + type - + '\'' - + ", comment='" - + comment - + '\'' - + ", precision=" - + precision - + ", scale=" - + scale - + '}'; - } -} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java deleted file mode 100644 index d59c6124cd..0000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.rest.models; - -import java.util.Map; -import java.util.Objects; - -public class QueryPlan { - private int status; - private String opaquedQueryPlan; - private Map<String, Tablet> partitions; - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - public String getOpaquedQueryPlan() { - return opaquedQueryPlan; - } - - public void setOpaquedQueryPlan(String opaquedQueryPlan) { - this.opaquedQueryPlan = opaquedQueryPlan; - } - - public Map<String, Tablet> getPartitions() { - return partitions; - } - - public void setPartitions(Map<String, Tablet> partitions) { - this.partitions = partitions; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - QueryPlan queryPlan = (QueryPlan) o; - return status == queryPlan.status - && Objects.equals(opaquedQueryPlan, queryPlan.opaquedQueryPlan) - && Objects.equals(partitions, queryPlan.partitions); - } - - @Override - public int hashCode() { - return Objects.hash(status, opaquedQueryPlan, partitions); - } -} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java deleted file mode 100644 index 60e06bba97..0000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.rest.models; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -public class Schema { - private int status = 0; - private String keysType; - private List<Field> properties; - - public Schema() { - properties = new ArrayList<>(); - } - - public Schema(int fieldCount) { - properties = new ArrayList<>(fieldCount); - } - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - public String getKeysType() { - return keysType; - } - - public void setKeysType(String keysType) { - this.keysType = keysType; - } - - public List<Field> getProperties() { - return properties; - } - - public void setProperties(List<Field> properties) { - this.properties = properties; - } - - public void put( - String name, - String type, - String comment, - int scale, - int precision, - String aggregationType) { - properties.add(new Field(name, type, comment, scale, precision, aggregationType)); - } - - public void put(Field f) { - properties.add(f); - } - - public Field get(int index) { - if (index >= properties.size()) { - throw new IndexOutOfBoundsException( - "Index: " + index + ", Fields size:" + properties.size()); - } - return properties.get(index); - } - - public int size() { - return properties.size(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Schema schema = (Schema) o; - return status == schema.status && Objects.equals(properties, schema.properties); - } - - @Override - public int hashCode() { - return Objects.hash(status, properties); - } - - @Override - public String toString() { - return "Schema{" + "status=" + status + ", properties=" + properties + '}'; - } -} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java deleted file mode 100644 index cb52490997..0000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.rest.models; - -import java.util.List; -import java.util.Objects; - -public class Tablet { - private List<String> routings; - private int version; - private long versionHash; - private long schemaHash; - - public List<String> getRoutings() { - return routings; - } - - public void setRoutings(List<String> routings) { - this.routings = routings; - } - - public int getVersion() { - return version; - } - - public void setVersion(int version) { - this.version = version; - } - - public long getVersionHash() { - return versionHash; - } - - public void setVersionHash(long versionHash) { - this.versionHash = versionHash; - } - - public long getSchemaHash() { - return schemaHash; - } - - public void setSchemaHash(long schemaHash) { - this.schemaHash = schemaHash; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Tablet tablet = (Tablet) o; - return version == tablet.version - && versionHash == tablet.versionHash - && schemaHash == tablet.schemaHash - && Objects.equals(routings, tablet.routings); - } - - @Override - public int hashCode() { - return Objects.hash(routings, version, versionHash, schemaHash); - } -} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java index 8a45e34cef..e0ef8e1893 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java @@ -18,17 +18,24 @@ package org.apache.seatunnel.connectors.doris.sink; import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.DefaultSaveModeHandler; import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; import org.apache.seatunnel.api.sink.SupportSaveMode; +import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.factory.CatalogFactory; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo; import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfoSerializer; import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitter; @@ -41,17 +48,22 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory; + public class DorisSink implements SeaTunnelSink<SeaTunnelRow, DorisSinkState, DorisCommitInfo, DorisCommitInfo>, - SupportSaveMode { + SupportSaveMode, + SupportMultiTableSink { private final DorisConfig dorisConfig; - private final SeaTunnelRowType seaTunnelRowType; + private final ReadonlyConfig config; + private final CatalogTable catalogTable; private String jobId; public DorisSink(ReadonlyConfig config, CatalogTable catalogTable) { + this.config = config; + this.catalogTable = catalogTable; this.dorisConfig = DorisConfig.of(config); - this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); } @Override @@ -68,13 +80,13 @@ public class DorisSink public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> createWriter( SinkWriter.Context context) throws IOException { return new DorisSinkWriter( - context, Collections.emptyList(), seaTunnelRowType, dorisConfig, jobId); + context, Collections.emptyList(), catalogTable, dorisConfig, jobId); } @Override public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> restoreWriter( SinkWriter.Context context, List<DorisSinkState> states) throws IOException { - return new DorisSinkWriter(context, states, seaTunnelRowType, dorisConfig, jobId); + return new DorisSinkWriter(context, states, catalogTable, dorisConfig, jobId); } @Override @@ -94,6 +106,27 @@ public class DorisSink @Override public Optional<SaveModeHandler> getSaveModeHandler() { - return Optional.empty(); + CatalogFactory catalogFactory = + discoverFactory( + Thread.currentThread().getContextClassLoader(), + CatalogFactory.class, + "Doris"); + if (catalogFactory == null) { + throw new DorisConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, "Cannot find Doris catalog factory")); + } + + Catalog catalog = catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config); + catalog.open(); + return Optional.of( + new DefaultSaveModeHandler( + config.get(DorisOptions.SCHEMA_SAVE_MODE), + config.get(DorisOptions.DATA_SAVE_MODE), + catalog, + catalogTable, + config.get(DorisOptions.CUSTOM_SQL))); } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java index 6c001c6244..35b0b8da5e 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.doris.sink; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -28,9 +29,20 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.doris.config.DorisOptions; import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo; import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState; +import org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils; + +import org.apache.commons.lang3.StringUtils; import com.google.auto.service.AutoService; +import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY; +import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY; +import static org.apache.seatunnel.api.sink.SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE; +import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER; + @AutoService(Factory.class) public class DorisSinkFactory implements TableSinkFactory< @@ -49,7 +61,50 @@ public class DorisSinkFactory public TableSink<SeaTunnelRow, DorisSinkState, DorisCommitInfo, DorisCommitInfo> createSink( TableSinkFactoryContext context) { ReadonlyConfig config = context.getOptions(); - CatalogTable catalogTable = context.getCatalogTable(); - return () -> new DorisSink(config, catalogTable); + CatalogTable catalogTable = + config.get(NEEDS_UNSUPPORTED_TYPE_CASTING) + ? UnsupportedTypeConverterUtils.convertCatalogTable( + context.getCatalogTable()) + : context.getCatalogTable(); + final CatalogTable finalCatalogTable = this.renameCatalogTable(config, catalogTable); + return () -> new DorisSink(config, finalCatalogTable); + } + + private CatalogTable renameCatalogTable(ReadonlyConfig options, CatalogTable catalogTable) { + TableIdentifier tableId = catalogTable.getTableId(); + String tableName; + String databaseName; + String tableIdentifier = options.get(TABLE_IDENTIFIER); + if (StringUtils.isNotEmpty(tableIdentifier)) { + tableName = tableIdentifier.split("\\.")[1]; + databaseName = tableIdentifier.split("\\.")[0]; + } else { + if (StringUtils.isNotEmpty(options.get(TABLE))) { + tableName = replaceName(options.get(TABLE), tableId); + } else { + tableName = tableId.getTableName(); + } + if (StringUtils.isNotEmpty(options.get(DATABASE))) { + databaseName = replaceName(options.get(DATABASE), tableId); + } else { + databaseName = tableId.getDatabaseName(); + } + } + TableIdentifier newTableId = + TableIdentifier.of(tableId.getCatalogName(), databaseName, null, tableName); + return CatalogTable.of(newTableId, catalogTable); + } + + private String replaceName(String original, TableIdentifier tableId) { + if (tableId.getTableName() != null) { + original = original.replace(REPLACE_TABLE_NAME_KEY, tableId.getTableName()); + } + if (tableId.getSchemaName() != null) { + original = original.replace(REPLACE_SCHEMA_NAME_KEY, tableId.getSchemaName()); + } + if (tableId.getDatabaseName() != null) { + original = original.replace(REPLACE_DATABASE_NAME_KEY, tableId.getDatabaseName()); + } + return original; } } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java index 03179f92e5..c3c5725b32 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java @@ -22,11 +22,13 @@ import lombok.Getter; import lombok.Setter; import lombok.ToString; +import java.io.Serializable; + @Setter @Getter @ToString @EqualsAndHashCode -public class DorisSinkState { +public class DorisSinkState implements Serializable { private final String labelPrefix; private final long checkpointId; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index 2f35357e5b..0abdc6269c 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -18,6 +18,8 @@ package org.apache.seatunnel.connectors.doris.sink.writer; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.doris.config.DorisConfig; @@ -31,6 +33,7 @@ import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer; import org.apache.seatunnel.connectors.doris.sink.LoadStatus; import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo; import org.apache.seatunnel.connectors.doris.util.HttpUtil; +import org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -53,7 +56,9 @@ import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkState; @Slf4j -public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> { +public class DorisSinkWriter + implements SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>, + SupportMultiTableSinkWriter<Void> { private static final int INITIAL_DELAY = 200; private static final int CONNECT_TIMEOUT = 1000; private static final List<String> DORIS_SUCCESS_STATUS = @@ -66,6 +71,7 @@ public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo private final LabelGenerator labelGenerator; private final int intervalTime; private final DorisSerializer serializer; + private final CatalogTable catalogTable; private final transient ScheduledExecutorService scheduledExecutorService; private transient Thread executorThread; private transient volatile Exception loadException = null; @@ -75,21 +81,28 @@ public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo public DorisSinkWriter( SinkWriter.Context context, List<DorisSinkState> state, - SeaTunnelRowType seaTunnelRowType, + CatalogTable catalogTable, DorisConfig dorisConfig, String jobId) throws IOException { this.dorisConfig = dorisConfig; + this.catalogTable = catalogTable; this.lastCheckpointId = !state.isEmpty() ? state.get(0).getCheckpointId() : 0; log.info("restore checkpointId {}", lastCheckpointId); log.info("labelPrefix " + dorisConfig.getLabelPrefix()); this.labelPrefix = - dorisConfig.getLabelPrefix() + "_" + jobId + "_" + context.getIndexOfSubtask(); + dorisConfig.getLabelPrefix() + + "_" + + catalogTable.getTablePath().getFullName().replaceAll("\\.", "_") + + "_" + + jobId + + "_" + + context.getIndexOfSubtask(); this.labelGenerator = new LabelGenerator(labelPrefix, dorisConfig.getEnable2PC()); this.scheduledExecutorService = new ScheduledThreadPoolExecutor( 1, new ThreadFactoryBuilder().setNameFormat("stream-load-check").build()); - this.serializer = createSerializer(dorisConfig, seaTunnelRowType); + this.serializer = createSerializer(dorisConfig, catalogTable.getSeaTunnelRowType()); this.intervalTime = dorisConfig.getCheckInterval(); this.loading = false; this.initializeLoad(); @@ -101,7 +114,11 @@ public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo try { this.dorisStreamLoad = new DorisStreamLoad( - backend, dorisConfig, labelGenerator, new HttpUtil().getHttpClient()); + backend, + catalogTable.getTablePath(), + dorisConfig, + labelGenerator, + new HttpUtil().getHttpClient()); if (dorisConfig.getEnable2PC()) { dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); } @@ -120,7 +137,11 @@ public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo @Override public void write(SeaTunnelRow element) throws IOException { checkLoadException(); - byte[] serialize = serializer.serialize(element); + byte[] serialize = + serializer.serialize( + dorisConfig.isNeedsUnsupportedTypeCasting() + ? UnsupportedTypeConverterUtils.convertRow(element) + : element); if (Objects.isNull(serialize)) { return; } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java index 892c1da954..00f21e9ae5 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.doris.sink.writer; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.doris.config.DorisConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; @@ -82,13 +83,13 @@ public class DorisStreamLoad implements Serializable { public DorisStreamLoad( String hostPort, + TablePath tablePath, DorisConfig dorisConfig, LabelGenerator labelGenerator, CloseableHttpClient httpClient) { this.hostPort = hostPort; - String[] tableInfo = dorisConfig.getTableIdentifier().split("\\."); - this.db = tableInfo[0]; - this.table = tableInfo[1]; + this.db = tablePath.getDatabaseName(); + this.table = tablePath.getTableName(); this.user = dorisConfig.getUsername(); this.passwd = dorisConfig.getPassword(); this.labelGenerator = labelGenerator; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java index fc1884efcf..3f890af855 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.LocalTimeType; @@ -34,11 +35,14 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.checkNotNull; + public class DorisCatalogUtil { public static final String ALL_DATABASES_QUERY = @@ -109,10 +113,22 @@ public class DorisCatalogUtil { .map(r -> "`" + r + "`") .collect(Collectors.joining(",")); } + String uniqueKey = ""; + if (!tableSchema.getConstraintKeys().isEmpty()) { + uniqueKey = + tableSchema.getConstraintKeys().stream() + .flatMap(c -> c.getColumnNames().stream()) + .map(r -> "`" + r.getColumnName() + "`") + .collect(Collectors.joining(",")); + } template = template.replaceAll( String.format("\\$\\{%s\\}", SaveModeConstants.ROWTYPE_PRIMARY_KEY), primaryKey); + template = + template.replaceAll( + String.format("\\$\\{%s\\}", SaveModeConstants.ROWTYPE_UNIQUE_KEY), + uniqueKey); Map<String, CreateTableParser.ColumnInfo> columnInTemplate = CreateTableParser.getColumnList(template); template = mergeColumnInTemplate(columnInTemplate, tableSchema, template); @@ -120,15 +136,7 @@ public class DorisCatalogUtil { String rowTypeFields = tableSchema.getColumns().stream() .filter(column -> !columnInTemplate.containsKey(column.getName())) - .map( - column -> - String.format( - "`%s` %s %s ", - column.getName(), - fromSeaTunnelType( - column.getDataType(), - column.getColumnLength()), - column.isNullable() ? "NULL" : "NOT NULL")) + .map(DorisCatalogUtil::columnToDorisType) .collect(Collectors.joining(",\n")); return template.replaceAll( String.format("\\$\\{%s\\}", SaveModeConstants.DATABASE), @@ -149,18 +157,18 @@ public class DorisCatalogUtil { Map<String, Column> columnMap = tableSchema.getColumns().stream() .collect(Collectors.toMap(Column::getName, Function.identity())); - for (String col : columnInTemplate.keySet()) { - CreateTableParser.ColumnInfo columnInfo = columnInTemplate.get(col); + List<CreateTableParser.ColumnInfo> columnInfosInSeq = + columnInTemplate.values().stream() + .sorted( + Comparator.comparingInt( + CreateTableParser.ColumnInfo::getStartIndex)) + .collect(Collectors.toList()); + for (CreateTableParser.ColumnInfo columnInfo : columnInfosInSeq) { + String col = columnInfo.getName(); if (StringUtils.isEmpty(columnInfo.getInfo())) { if (columnMap.containsKey(col)) { Column column = columnMap.get(col); - String newCol = - String.format( - "`%s` %s %s ", - column.getName(), - fromSeaTunnelType( - column.getDataType(), column.getColumnLength()), - column.isNullable() ? "NULL" : "NOT NULL"); + String newCol = columnToDorisType(column); String prefix = template.substring(0, columnInfo.getStartIndex() + offset); String suffix = template.substring(offset + columnInfo.getEndIndex()); if (prefix.endsWith("`")) { @@ -181,6 +189,21 @@ public class DorisCatalogUtil { return template; } + private static String columnToDorisType(Column column) { + checkNotNull(column, "The column is required."); + return String.format( + "`%s` %s %s ", + column.getName(), + fromSeaTunnelType( + column.getDataType(), + Math.max( + column.getColumnLength() == null ? 0 : column.getColumnLength(), + column.getLongColumnLength() == null + ? 0 + : column.getLongColumnLength())), + column.isNullable() ? "NULL" : "NOT NULL"); + } + public static SeaTunnelDataType<?> fromDorisType(ResultSet rs) throws SQLException { String type = rs.getString(5).toUpperCase(); @@ -214,6 +237,7 @@ public class DorisCatalogUtil { return LocalTimeType.LOCAL_DATE_TIME_TYPE; case "DECIMAL": case "DECIMALV2": + case "DECIMALV3": int precision = rs.getInt(8); int scale = rs.getInt(9); return new DecimalType(precision, scale); @@ -233,14 +257,16 @@ public class DorisCatalogUtil { } } - private static String fromSeaTunnelType(SeaTunnelDataType<?> dataType, Integer columnLength) { + private static String fromSeaTunnelType(SeaTunnelDataType<?> dataType, Long columnLength) { switch (dataType.getSqlType()) { case STRING: - if (columnLength != null && columnLength > 65533) { - return "STRING"; + if (columnLength != null && columnLength <= 65533 && columnLength > 0) { + return String.format("VARCHAR(%d)", columnLength); } - return String.format("VARCHAR(%d)", columnLength); + return "STRING"; + case BYTES: + return "STRING"; case NULL: return "NULL_TYPE"; case BOOLEAN: @@ -260,13 +286,21 @@ public class DorisCatalogUtil { return String.format( "DECIMALV3(%d,%d)", decimalType.getPrecision(), decimalType.getScale()); case TIME: - return "TIME"; + return "VARCHAR(8)"; case DATE: return "DATEV2"; case TIMESTAMP: return "DATETIME"; + case ARRAY: + return "ARRAY<" + + fromSeaTunnelType( + ((ArrayType<?, ?>) dataType).getElementType(), Long.MAX_VALUE) + + ">"; + case MAP: case ROW: return "JSONB"; + case TINYINT: + return "TINYINT"; default: throw new CatalogException(String.format("Unsupported doris type: %s", dataType)); } diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java deleted file mode 100644 index 3e914d7d2a..0000000000 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.util; - -import java.io.IOException; -import java.io.StringReader; -import java.io.StringWriter; -import java.util.Properties; - -public class IOUtils { - public static String propsToString(Properties props) throws IllegalArgumentException { - StringWriter sw = new StringWriter(); - if (props != null) { - try { - props.store(sw, ""); - } catch (IOException ex) { - throw new IllegalArgumentException("Cannot parse props to String.", ex); - } - } - return sw.toString(); - } - - public static Properties propsFromString(String source) throws IllegalArgumentException { - Properties copy = new Properties(); - if (source != null) { - try { - copy.load(new StringReader(source)); - } catch (IOException ex) { - throw new IllegalArgumentException("Cannot parse props from String.", ex); - } - } - return copy; - } -} diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/UnsupportedTypeConverterUtils.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/UnsupportedTypeConverterUtils.java new file mode 100644 index 0000000000..bcf0ed534f --- /dev/null +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/UnsupportedTypeConverterUtils.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.util; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SqlType; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE; + +public class UnsupportedTypeConverterUtils { + public static Object convertBigDecimal(BigDecimal bigDecimal) { + if (bigDecimal.precision() > 38) { + return bigDecimal.doubleValue(); + } + return bigDecimal; + } + + public static SeaTunnelRow convertRow(SeaTunnelRow row) { + List<Object> newValues = + Arrays.stream(row.getFields()) + .map( + value -> { + if (value instanceof BigDecimal) { + return convertBigDecimal((BigDecimal) value); + } + return value; + }) + .collect(Collectors.toList()); + return new SeaTunnelRow(newValues.toArray()); + } + + public static CatalogTable convertCatalogTable(CatalogTable catalogTable) { + TableSchema tableSchema = catalogTable.getTableSchema(); + List<Column> columns = tableSchema.getColumns(); + List<Column> newColumns = + columns.stream() + .map( + column -> { + if (column.getDataType().getSqlType().equals(SqlType.DECIMAL)) { + DecimalType decimalType = + (DecimalType) column.getDataType(); + if (decimalType.getPrecision() > 38) { + return PhysicalColumn.of( + column.getName(), + DOUBLE_TYPE, + 22, + column.isNullable(), + null, + column.getComment(), + "DOUBLE", + false, + false, + 0L, + column.getOptions(), + 22L); + } + } + return column; + }) + .collect(Collectors.toList()); + TableSchema newtableSchema = + TableSchema.builder() + .columns(newColumns) + .primaryKey(tableSchema.getPrimaryKey()) + .constraintKey(tableSchema.getConstraintKeys()) + .build(); + + return CatalogTable.of( + catalogTable.getTableId(), + newtableSchema, + catalogTable.getOptions(), + catalogTable.getPartitionKeys(), + catalogTable.getComment(), + catalogTable.getCatalogName()); + } +} diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java new file mode 100644 index 0000000000..404351090e --- /dev/null +++ b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.catalog; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +@Slf4j +public class DorisCreateTableTest { + + @Test + public void test() { + + List<Column> columns = new ArrayList<>(); + + columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, null, true, null, "")); + columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, null, true, null, "")); + columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, null, true, null, "")); + columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, null, true, null, "")); + columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, null, true, null, "")); + columns.add(PhysicalColumn.of("create_time", BasicType.LONG_TYPE, null, true, null, "")); + + String result = + DorisCatalogUtil.getCreateTableStatement( + "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( \n" + + "${rowtype_primary_key} , \n" + + "${rowtype_unique_key} , \n" + + "`create_time` DATETIME NOT NULL , \n" + + "${rowtype_fields} \n" + + ") ENGINE=OLAP \n" + + "PRIMARY KEY(${rowtype_primary_key},`create_time`) \n" + + "PARTITION BY RANGE (`create_time`)( \n" + + " PARTITION p20230329 VALUES LESS THAN (\"2023-03-29\") \n" + + ") \n" + + "DISTRIBUTED BY HASH (${rowtype_primary_key}) \n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + + ")", + TablePath.of("test1.test2"), + CatalogTable.of( + TableIdentifier.of("test", "test1", "test2"), + TableSchema.builder() + .primaryKey(PrimaryKey.of("", Arrays.asList("id", "age"))) + .constraintKey( + Arrays.asList( + ConstraintKey.of( + ConstraintKey.ConstraintType + .UNIQUE_KEY, + "unique_key", + Collections.singletonList( + ConstraintKey + .ConstraintKeyColumn + .of( + "name", + ConstraintKey + .ColumnSortType + .DESC))), + ConstraintKey.of( + ConstraintKey.ConstraintType + .UNIQUE_KEY, + "unique_key2", + Collections.singletonList( + ConstraintKey + .ConstraintKeyColumn + .of( + "score", + ConstraintKey + .ColumnSortType + .ASC))))) + .columns(columns) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + "")); + Assertions.assertEquals( + result, + "CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n" + + "`id` BIGINT(1) NULL ,`age` INT(1) NULL , \n" + + "`name` STRING NULL ,`score` INT(1) NULL , \n" + + "`create_time` DATETIME NOT NULL , \n" + + "`gender` TINYINT NULL \n" + + ") ENGINE=OLAP \n" + + "PRIMARY KEY(`id`,`age`,`create_time`) \n" + + "PARTITION BY RANGE (`create_time`)( \n" + + " PARTITION p20230329 VALUES LESS THAN (\"2023-03-29\") \n" + + ") \n" + + "DISTRIBUTED BY HASH (`id`,`age`) \n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + + ")"); + } + + @Test + public void testInSeq() { + + List<Column> columns = new ArrayList<>(); + + columns.add(PhysicalColumn.of("L_ORDERKEY", BasicType.INT_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_PARTKEY", BasicType.INT_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_SUPPKEY", BasicType.INT_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_LINENUMBER", BasicType.INT_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_QUANTITY", new DecimalType(15, 2), null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_EXTENDEDPRICE", new DecimalType(15, 2), null, false, null, "")); + columns.add(PhysicalColumn.of("L_DISCOUNT", new DecimalType(15, 2), null, false, null, "")); + columns.add(PhysicalColumn.of("L_TAX", new DecimalType(15, 2), null, false, null, "")); + columns.add( + PhysicalColumn.of("L_RETURNFLAG", BasicType.STRING_TYPE, null, false, null, "")); + columns.add( + PhysicalColumn.of("L_LINESTATUS", BasicType.STRING_TYPE, null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_SHIPDATE", LocalTimeType.LOCAL_DATE_TYPE, null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_COMMITDATE", LocalTimeType.LOCAL_DATE_TYPE, null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_RECEIPTDATE", LocalTimeType.LOCAL_DATE_TYPE, null, false, null, "")); + columns.add( + PhysicalColumn.of("L_SHIPINSTRUCT", BasicType.STRING_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_SHIPMODE", BasicType.STRING_TYPE, null, false, null, "")); + columns.add(PhysicalColumn.of("L_COMMENT", BasicType.STRING_TYPE, null, false, null, "")); + + String result = + DorisCatalogUtil.getCreateTableStatement( + "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (\n" + + "`L_COMMITDATE`,\n" + + "${rowtype_primary_key},\n" + + "L_SUPPKEY BIGINT NOT NULL,\n" + + "${rowtype_fields}\n" + + ") ENGINE=OLAP\n" + + " PRIMARY KEY (L_COMMITDATE, ${rowtype_primary_key}, L_SUPPKEY)\n" + + "DISTRIBUTED BY HASH (${rowtype_primary_key})" + + "PROPERTIES (\n" + + " \"replication_num\" = \"1\" \n" + + ")", + TablePath.of("tpch", "lineitem"), + CatalogTable.of( + TableIdentifier.of("test", "tpch", "lineitem"), + TableSchema.builder() + .primaryKey( + PrimaryKey.of( + "", + Arrays.asList( + "L_ORDERKEY", "L_LINENUMBER"))) + .columns(columns) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + "")); + String expected = + "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n" + + "`L_COMMITDATE` DATEV2 NOT NULL ,\n" + + "`L_ORDERKEY` INT(1) NOT NULL ,`L_LINENUMBER` INT(1) NOT NULL ,\n" + + "L_SUPPKEY BIGINT NOT NULL,\n" + + "`L_PARTKEY` INT(1) NOT NULL ,\n" + + "`L_QUANTITY` DECIMALV3(15,2) NOT NULL ,\n" + + "`L_EXTENDEDPRICE` DECIMALV3(15,2) NOT NULL ,\n" + + "`L_DISCOUNT` DECIMALV3(15,2) NOT NULL ,\n" + + "`L_TAX` DECIMALV3(15,2) NOT NULL ,\n" + + "`L_RETURNFLAG` STRING NOT NULL ,\n" + + "`L_LINESTATUS` STRING NOT NULL ,\n" + + "`L_SHIPDATE` DATEV2 NOT NULL ,\n" + + "`L_RECEIPTDATE` DATEV2 NOT NULL ,\n" + + "`L_SHIPINSTRUCT` STRING NOT NULL ,\n" + + "`L_SHIPMODE` STRING NOT NULL ,\n" + + "`L_COMMENT` STRING NOT NULL \n" + + ") ENGINE=OLAP\n" + + " PRIMARY KEY (L_COMMITDATE, `L_ORDERKEY`,`L_LINENUMBER`, L_SUPPKEY)\n" + + "DISTRIBUTED BY HASH (`L_ORDERKEY`,`L_LINENUMBER`)PROPERTIES (\n" + + " \"replication_num\" = \"1\" \n" + + ")"; + Assertions.assertEquals(result, expected); + } + + @Test + public void testWithVarchar() { + + List<Column> columns = new ArrayList<>(); + + columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, null, true, null, "")); + columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, null, true, null, "")); + columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, null, true, null, "")); + columns.add(PhysicalColumn.of("comment", BasicType.STRING_TYPE, 500, true, null, "")); + columns.add(PhysicalColumn.of("description", BasicType.STRING_TYPE, 70000, true, null, "")); + + String result = + DorisCatalogUtil.getCreateTableStatement( + "CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( \n" + + "${rowtype_primary_key} , \n" + + "`create_time` DATETIME NOT NULL , \n" + + "${rowtype_fields} \n" + + ") ENGINE=OLAP \n" + + "PRIMARY KEY(${rowtype_primary_key},`create_time`) \n" + + "PARTITION BY RANGE (`create_time`)( \n" + + " PARTITION p20230329 VALUES LESS THAN (\"2023-03-29\") \n" + + ") \n" + + "DISTRIBUTED BY HASH (${rowtype_primary_key}) \n" + + "PROPERTIES ( \n" + + " \"dynamic_partition.enable\" = \"true\", \n" + + " \"dynamic_partition.time_unit\" = \"DAY\", \n" + + " \"dynamic_partition.end\" = \"3\", \n" + + " \"dynamic_partition.prefix\" = \"p\" \n" + + ");", + TablePath.of("test1", "test2"), + CatalogTable.of( + TableIdentifier.of("test", "test1", "test2"), + TableSchema.builder() + .primaryKey(PrimaryKey.of("", Arrays.asList("id", "age"))) + .columns(columns) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + "")); + + Assertions.assertEquals( + result, + "CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n" + + "`id` BIGINT(1) NULL ,`age` INT(1) NULL , \n" + + "`create_time` DATETIME NOT NULL , \n" + + "`name` STRING NULL ,\n" + + "`comment` VARCHAR(500) NULL ,\n" + + "`description` STRING NULL \n" + + ") ENGINE=OLAP \n" + + "PRIMARY KEY(`id`,`age`,`create_time`) \n" + + "PARTITION BY RANGE (`create_time`)( \n" + + " PARTITION p20230329 VALUES LESS THAN (\"2023-03-29\") \n" + + ") \n" + + "DISTRIBUTED BY HASH (`id`,`age`) \n" + + "PROPERTIES ( \n" + + " \"dynamic_partition.enable\" = \"true\", \n" + + " \"dynamic_partition.time_unit\" = \"DAY\", \n" + + " \"dynamic_partition.end\" = \"3\", \n" + + " \"dynamic_partition.prefix\" = \"p\" \n" + + ");"); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java index 4722b84025..275c714686 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java @@ -18,17 +18,21 @@ package org.apache.seatunnel.e2e.connector.doris; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog; import org.apache.seatunnel.connectors.doris.catalog.DorisCatalogFactory; import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -42,12 +46,32 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; @Slf4j public class DorisCatalogIT extends AbstractDorisIT { private static final String DATABASE = "test"; private static final String SINK_TABLE = "doris_catalog_e2e"; + private static final TablePath tablePath = TablePath.of(DATABASE, SINK_TABLE); + private static final CatalogTable catalogTable; + + static { + TableSchema.Builder builder = TableSchema.builder(); + builder.column(PhysicalColumn.of("k1", BasicType.INT_TYPE, 10, false, 0, "k1")); + builder.column(PhysicalColumn.of("k2", BasicType.STRING_TYPE, 64, false, "", "k2")); + builder.column(PhysicalColumn.of("v1", BasicType.DOUBLE_TYPE, 10, true, null, "v1")); + builder.column(PhysicalColumn.of("v2", new DecimalType(10, 2), 0, false, 0.1, "v2")); + builder.primaryKey(PrimaryKey.of("pk", Arrays.asList("k1", "k2"))); + catalogTable = + CatalogTable.of( + TableIdentifier.of("doris", tablePath), + builder.build(), + Collections.emptyMap(), + Collections.emptyList(), + "test"); + } + private DorisCatalogFactory factory; private DorisCatalog catalog; @@ -74,7 +98,6 @@ public class DorisCatalogIT extends AbstractDorisIT { map.put(DorisOptions.QUERY_PORT.key(), QUERY_PORT); map.put(DorisOptions.USERNAME.key(), USERNAME); map.put(DorisOptions.PASSWORD.key(), PASSWORD); - map.put(DorisOptions.DEFAULT_DATABASE.key(), PASSWORD); catalog = (DorisCatalog) factory.createCatalog(catalogName, ReadonlyConfig.fromMap(map)); @@ -98,26 +121,10 @@ public class DorisCatalogIT extends AbstractDorisIT { return; } - TablePath tablePath = TablePath.of(DATABASE, SINK_TABLE); - - TableSchema.Builder builder = TableSchema.builder(); - builder.column(PhysicalColumn.of("k1", BasicType.INT_TYPE, 10, false, 0, "k1")); - builder.column(PhysicalColumn.of("k2", BasicType.STRING_TYPE, 64, false, "", "k2")); - builder.column(PhysicalColumn.of("v1", BasicType.DOUBLE_TYPE, 10, true, null, "v1")); - builder.column(PhysicalColumn.of("v2", new DecimalType(10, 2), 0, false, 0.1, "v2")); - builder.primaryKey(PrimaryKey.of("pk", Arrays.asList("k1", "k2"))); - CatalogTable catalogTable = - CatalogTable.of( - TableIdentifier.of("doris", DATABASE, SINK_TABLE), - builder.build(), - Collections.emptyMap(), - Collections.emptyList(), - "test"); - boolean dbCreated = false; List<String> databases = catalog.listDatabases(); - Assertions.assertEquals(databases.size(), 1); + Assertions.assertFalse(databases.isEmpty()); if (!catalog.databaseExists(tablePath.getDatabaseName())) { catalog.createDatabase(tablePath, false); @@ -129,7 +136,7 @@ public class DorisCatalogIT extends AbstractDorisIT { Assertions.assertTrue(catalog.tableExists(tablePath)); List<String> tables = catalog.listTables(tablePath.getDatabaseName()); - Assertions.assertEquals(tables.size(), 1); + Assertions.assertFalse(tables.isEmpty()); catalog.dropTable(tablePath, false); Assertions.assertFalse(catalog.tableExists(tablePath)); @@ -140,6 +147,113 @@ public class DorisCatalogIT extends AbstractDorisIT { } } + @Test + void testSaveMode() { + CatalogTable upstreamTable = + CatalogTable.of( + TableIdentifier.of("doris", TablePath.of("test.test")), catalogTable); + ReadonlyConfig config = + ReadonlyConfig.fromMap( + new HashMap<String, Object>() { + { + put( + DorisOptions.FENODES.key(), + container.getHost() + ":" + HTTP_PORT); + put(DorisOptions.USERNAME.key(), USERNAME); + put(DorisOptions.PASSWORD.key(), PASSWORD); + } + }); + assertCreateTable(upstreamTable, config, "test.test"); + + ReadonlyConfig config2 = + ReadonlyConfig.fromMap( + new HashMap<String, Object>() { + { + put( + DorisOptions.FENODES.key(), + container.getHost() + ":" + HTTP_PORT); + put(DorisOptions.DATABASE.key(), "test2"); + put(DorisOptions.TABLE.key(), "test2"); + put(DorisOptions.USERNAME.key(), USERNAME); + put(DorisOptions.PASSWORD.key(), PASSWORD); + } + }); + assertCreateTable(upstreamTable, config2, "test2.test2"); + + ReadonlyConfig config3 = + ReadonlyConfig.fromMap( + new HashMap<String, Object>() { + { + put( + DorisOptions.FENODES.key(), + container.getHost() + ":" + HTTP_PORT); + put(DorisOptions.TABLE_IDENTIFIER.key(), "test3.test3"); + put(DorisOptions.USERNAME.key(), USERNAME); + put(DorisOptions.PASSWORD.key(), PASSWORD); + } + }); + assertCreateTable(upstreamTable, config3, "test3.test3"); + + ReadonlyConfig config4 = + ReadonlyConfig.fromMap( + new HashMap<String, Object>() { + { + put( + DorisOptions.FENODES.key(), + container.getHost() + ":" + HTTP_PORT); + put(DorisOptions.DATABASE.key(), "test5"); + put(DorisOptions.TABLE.key(), "${table_name}"); + put(DorisOptions.USERNAME.key(), USERNAME); + put(DorisOptions.PASSWORD.key(), PASSWORD); + } + }); + assertCreateTable(upstreamTable, config4, "test5.test"); + + ReadonlyConfig config5 = + ReadonlyConfig.fromMap( + new HashMap<String, Object>() { + { + put( + DorisOptions.FENODES.key(), + container.getHost() + ":" + HTTP_PORT); + put(DorisOptions.DATABASE.key(), "test4"); + put(DorisOptions.TABLE.key(), "test4"); + put(DorisOptions.USERNAME.key(), USERNAME); + put(DorisOptions.PASSWORD.key(), PASSWORD); + put(DorisOptions.NEEDS_UNSUPPORTED_TYPE_CASTING.key(), true); + } + }); + upstreamTable + .getTableSchema() + .getColumns() + .add(PhysicalColumn.of("v3", new DecimalType(66, 22), 66, false, null, "v3")); + CatalogTable newTable = assertCreateTable(upstreamTable, config5, "test4.test4"); + Assertions.assertEquals( + BasicType.DOUBLE_TYPE, newTable.getTableSchema().getColumns().get(4).getDataType()); + } + + private CatalogTable assertCreateTable( + CatalogTable upstreamTable, ReadonlyConfig config, String fullName) { + DorisSinkFactory dorisSinkFactory = new DorisSinkFactory(); + TableSinkFactoryContext context = + new TableSinkFactoryContext( + upstreamTable, config, Thread.currentThread().getContextClassLoader()); + SupportSaveMode sink = (SupportSaveMode) dorisSinkFactory.createSink(context).createSink(); + sink.getSaveModeHandler().get().handleSaveMode(); + CatalogTable createdTable = catalog.getTable(TablePath.of(fullName)); + Assertions.assertEquals( + upstreamTable.getTableSchema().getColumns().size(), + createdTable.getTableSchema().getColumns().size()); + Assertions.assertIterableEquals( + upstreamTable.getTableSchema().getColumns().stream() + .map(Column::getName) + .collect(Collectors.toList()), + createdTable.getTableSchema().getColumns().stream() + .map(Column::getName) + .collect(Collectors.toList())); + return createdTable; + } + @AfterAll public void close() { if (catalog != null) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf index a4b2fbbd74..d4d4e69f9d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf @@ -36,7 +36,8 @@ sink { fenodes = "10.16.10.14:8234" username = root password = "" - table.identifier = "test.e2e_table_sink" + database = "test" + table = "e2e_table_sink" sink.label-prefix = "test-cdc" sink.enable-2pc = "false" sink.enable-delete = "true" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf index f656130239..4c717c14c8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/doris-jdbc-to-doris.conf @@ -38,7 +38,8 @@ sink { fenodes = "doris_e2e:8030" username = root password = "" - table.identifier = "test.e2e_table_sink" + database = "test" + table = "e2e_table_sink" sink.enable-2pc = "false" sink.label-prefix = "test_doris" doris.config = {