This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 7e0008e6f [Improve] [Connector-V2] [StarRocks] Starrocks Support Auto
Create Table (#4177)
7e0008e6f is described below
commit 7e0008e6fbcbb238bc86f97988618732b41127f1
Author: Hisoka <[email protected]>
AuthorDate: Tue Feb 21 17:22:20 2023 +0800
[Improve] [Connector-V2] [StarRocks] Starrocks Support Auto Create Table
(#4177)
---
docs/en/connector-v2/sink/StarRocks.md | 30 +-
.../seatunnel/api/sink/SaveModeConstants.java | 29 ++
.../seatunnel/api/table/catalog/CatalogTable.java | 11 +
.../seatunnel/api/table/catalog/TablePath.java | 4 +
.../seatunnel/jdbc/catalog/MySqlCatalog.java | 2 +-
.../kafka/catalog/KafkaDataTypeConvertor.java | 3 +
.../connector-starrocks/pom.xml | 7 +
.../starrocks/catalog/StarRocksCatalog.java | 415 +++++++++++++++++++++
.../catalog/StarRocksDataTypeConvertor.java | 194 ++++++++++
.../starrocks/client/StarRocksSinkManager.java | 5 +-
.../seatunnel/starrocks/config/SinkConfig.java | 31 +-
.../starrocks/sink/StarRocksSaveModeUtil.java | 88 +++++
.../seatunnel/starrocks/sink/StarRocksSink.java | 55 ++-
.../starrocks/sink/StarRocksSinkFactory.java | 9 +-
.../starrocks/sink/StarRocksSinkWriter.java | 5 +-
.../seatunnel/starrocks/StarRocksCatalogTest.java | 82 ++++
16 files changed, 948 insertions(+), 22 deletions(-)
diff --git a/docs/en/connector-v2/sink/StarRocks.md
b/docs/en/connector-v2/sink/StarRocks.md
index 5bdddffbc..207823bda 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -18,7 +18,7 @@ The internal implementation of StarRocks sink connector is
cached and imported b
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
-| table | string | yes | - |
+| table | string | no | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
@@ -27,6 +27,7 @@ The internal implementation of StarRocks sink connector is
cached and imported b
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| enable_upsert_delete | boolean | no | false |
+| save_mode_create_template | string | no | see below |
| starrocks.config | map | no | - |
### node_urls [list]
@@ -47,7 +48,7 @@ The name of StarRocks database
### table [string]
-The name of StarRocks table
+The name of StarRocks table, If not set, the table name will be the name of
the upstream table
### labelPrefix [string]
@@ -81,6 +82,31 @@ The amount of time to wait before attempting to retry a
request to `StarRocks`
Whether to enable upsert/delete, only supports PrimaryKey model.
+### save_mode_create_template [string]
+
+We use templates to automatically create starrocks tables,
+which will create corresponding table creation statements based on the type of
upstream data and schema type,
+and the default template can be modified according to the situation
+
+```sql
+CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
+(
+ ${rowtype_fields}
+) ENGINE = OLAP DISTRIBUTED BY HASH (${rowtype_primary_key})
+ PROPERTIES
+(
+ "replication_num" = "1"
+);
+```
+
+You can use the following placeholders
+
+- database: Used to get the database in the upstream schema
+- table_name: Used to get the table name in the upstream schema
+- rowtype_fields: Used to get all the fields in the upstream schema, we will
automatically map to the field
+ description of StarRocks
+- rowtype_primary_key: Used to get the primary key in the upstream schema
(maybe a list)
+
### starrocks.config [map]
The parameter of the stream load `data_desc`
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
new file mode 100644
index 000000000..e3e56089a
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModeConstants.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink;
+
+public class SaveModeConstants {
+
+ public static final String ROWTYPE_PRIMARY_KEY = "rowtype_primary_key";
+ public static final String ROWTYPE_FIELDS = "rowtype_fields";
+
+ public static final String TABLE_NAME = "table_name";
+
+ public static final String DATABASE = "database";
+
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
index 1c8542300..9a8c6817f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTable.java
@@ -90,4 +90,15 @@ public final class CatalogTable implements Serializable {
public String getComment() {
return comment;
}
+
+ @Override
+ public String toString() {
+ return "CatalogTable{" +
+ "tableId=" + tableId +
+ ", tableSchema=" + tableSchema +
+ ", options=" + options +
+ ", partitionKeys=" + partitionKeys +
+ ", comment='" + comment + '\'' +
+ '}';
+ }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
index 61f61d936..0de425e63 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java
@@ -58,6 +58,10 @@ public final class TablePath implements Serializable {
return String.format("%s.%s", databaseName, tableName);
}
+ public String getFullNameWithQuoted() {
+ return String.format("`%s`.`%s`", databaseName, tableName);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
index c4b8abcab..429de5683 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -127,7 +127,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData,
tablePath.getDatabaseName(), tablePath.getTableName());
List<ConstraintKey> constraintKeys = getConstraintKeys(metaData,
tablePath.getDatabaseName(), tablePath.getTableName());
- try (PreparedStatement ps =
conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1 = 0;",
tablePath.getFullName()))) {
+ try (PreparedStatement ps =
conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1 = 0;",
tablePath.getFullNameWithQuoted()))) {
ResultSetMetaData tableMetaData = ps.getMetaData();
TableSchema.Builder builder = TableSchema.builder();
// add column
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
index 99e0ab714..3a879e494 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaDataTypeConvertor.java
@@ -24,6 +24,8 @@ import
org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import com.google.auto.service.AutoService;
+
import java.util.Map;
/**
@@ -39,6 +41,7 @@ import java.util.Map;
* </pre>
* <p> Right now the data type of kafka is SeaTunnelType, so we don't need to
convert the data type.
*/
+@AutoService(DataTypeConvertor.class)
public class KafkaDataTypeConvertor implements
DataTypeConvertor<SeaTunnelDataType<?>> {
@Override
diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml
b/seatunnel-connectors-v2/connector-starrocks/pom.xml
index f7992271b..232e358e0 100644
--- a/seatunnel-connectors-v2/connector-starrocks/pom.xml
+++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml
@@ -31,6 +31,7 @@
<properties>
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.4</httpcore.version>
+ <mysql.version>8.0.16</mysql.version>
</properties>
<dependencies>
@@ -44,6 +45,12 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
new file mode 100644
index 000000000..46fdb12b7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
+
+import com.mysql.cj.MysqlType;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class StarRocksCatalog implements Catalog {
+
+ protected final String catalogName;
+ protected final String defaultDatabase;
+ protected final String username;
+ protected final String pwd;
+ protected final String baseUrl;
+ protected final String defaultUrl;
+
+ private static final Set<String> SYS_DATABASES = new HashSet<>();
+ private static final Logger LOG =
LoggerFactory.getLogger(StarRocksCatalog.class);
+
+ static {
+ SYS_DATABASES.add("information_schema");
+ SYS_DATABASES.add("_statistics_");
+ }
+
+ public StarRocksCatalog(
+ String catalogName,
+ String username,
+ String pwd,
+ String defaultUrl) {
+
+ checkArgument(StringUtils.isNotBlank(username));
+ checkArgument(StringUtils.isNotBlank(pwd));
+ checkArgument(StringUtils.isNotBlank(defaultUrl));
+
+ defaultUrl = defaultUrl.trim();
+ validateJdbcUrlWithDatabase(defaultUrl);
+ this.catalogName = catalogName;
+ this.username = username;
+ this.pwd = pwd;
+ this.defaultUrl = defaultUrl;
+ String[] strings = splitDefaultUrl(defaultUrl);
+ this.baseUrl = strings[0];
+ this.defaultDatabase = strings[1];
+ }
+
+ public StarRocksCatalog(
+ String catalogName,
+ String defaultDatabase,
+ String username,
+ String pwd,
+ String baseUrl) {
+
+ checkArgument(StringUtils.isNotBlank(username));
+ checkArgument(StringUtils.isNotBlank(pwd));
+ checkArgument(StringUtils.isNotBlank(baseUrl));
+
+ baseUrl = baseUrl.trim();
+ validateJdbcUrlWithoutDatabase(baseUrl);
+ this.catalogName = catalogName;
+ this.defaultDatabase = defaultDatabase;
+ this.username = username;
+ this.pwd = pwd;
+ this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
+ this.defaultUrl = this.baseUrl + defaultDatabase;
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+
+ PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
+
+ List<String> databases = new ArrayList<>();
+ ResultSet rs = ps.executeQuery();
+
+ while (rs.next()) {
+ String databaseName = rs.getString(1);
+ if (!SYS_DATABASES.contains(databaseName)) {
+ databases.add(rs.getString(1));
+ }
+ }
+
+ return databases;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
this.catalogName), e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws
CatalogException, DatabaseNotExistException {
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(this.catalogName,
databaseName);
+ }
+
+ try (Connection conn = DriverManager.getConnection(baseUrl +
databaseName, username, pwd)) {
+ PreparedStatement ps =
+ conn.prepareStatement("SHOW TABLES;");
+
+ ResultSet rs = ps.executeQuery();
+
+ List<String> tables = new ArrayList<>();
+
+ while (rs.next()) {
+ tables.add(rs.getString(1));
+ }
+
+ return tables;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath) throws CatalogException,
TableNotExistException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(catalogName, tablePath);
+ }
+
+ String dbUrl = baseUrl + tablePath.getDatabaseName();
+ try (Connection conn = DriverManager.getConnection(dbUrl, username,
pwd)) {
+ Optional<PrimaryKey> primaryKey =
getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName());
+
+ PreparedStatement ps =
+ conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1
= 0;", tablePath.getFullNameWithQuoted()));
+
+ ResultSetMetaData tableMetaData = ps.getMetaData();
+
+ TableSchema.Builder builder = TableSchema.builder();
+ for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
+ SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
+ // TODO add default value and test it
+
builder.column(PhysicalColumn.of(tableMetaData.getColumnName(i), type,
tableMetaData.getColumnDisplaySize(i),
+ tableMetaData.isNullable(i) ==
ResultSetMetaData.columnNullable, null,
+ tableMetaData.getColumnLabel(i)));
+ }
+
+ primaryKey.ifPresent(builder::primaryKey);
+
+ TableIdentifier tableIdentifier = TableIdentifier.of(catalogName,
tablePath.getDatabaseName(), tablePath.getTableName());
+ return CatalogTable.of(tableIdentifier, builder.build(),
buildConnectorOptions(tablePath), Collections.emptyList(), "");
+ } catch (Exception e) {
+ throw new CatalogException(String.format("Failed getting table
%s", tablePath.getFullName()), e);
+ }
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
+ try (Connection conn = DriverManager.getConnection(baseUrl, username,
pwd)) {
+ if (ignoreIfExists) {
+ conn.createStatement().execute("CREATE DATABASE IF NOT EXISTS
`" + tablePath.getDatabaseName() + "`");
+ } else {
+ conn.createStatement().execute("CREATE DATABASE `" +
tablePath.getDatabaseName() + "`");
+ }
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
+ try (Connection conn = DriverManager.getConnection(baseUrl, username,
pwd)) {
+ if (ignoreIfNotExists) {
+ conn.createStatement().execute("DROP DATABASE IF EXISTS `" +
tablePath.getDatabaseName() + "`");
+ } else {
+ conn.createStatement().execute(String.format("DROP DATABASE
`%s`", tablePath.getDatabaseName()));
+ }
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
+ /**
+ * @see com.mysql.cj.MysqlType
+ */
+ private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int
colIndex) throws SQLException {
+ MysqlType starrocksType =
MysqlType.getByName(metadata.getColumnTypeName(colIndex));
+ switch (starrocksType) {
+ case NULL:
+ return BasicType.VOID_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case BIT:
+ case TINYINT:
+ return BasicType.BYTE_TYPE;
+ case TINYINT_UNSIGNED:
+ case SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case SMALLINT_UNSIGNED:
+ case INT:
+ case MEDIUMINT:
+ case MEDIUMINT_UNSIGNED:
+ return BasicType.INT_TYPE;
+ case INT_UNSIGNED:
+ case BIGINT:
+ return BasicType.LONG_TYPE;
+ case FLOAT:
+ case FLOAT_UNSIGNED:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ case DOUBLE_UNSIGNED:
+ return BasicType.DOUBLE_TYPE;
+ case TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case TIMESTAMP:
+ case DATETIME:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case CHAR:
+ case VARCHAR:
+ case TINYTEXT:
+ case TEXT:
+ case MEDIUMTEXT:
+ case LONGTEXT:
+ case JSON:
+ case ENUM:
+ return BasicType.STRING_TYPE;
+ case BINARY:
+ case VARBINARY:
+ case TINYBLOB:
+ case BLOB:
+ case MEDIUMBLOB:
+ case LONGBLOB:
+ case GEOMETRY:
+ return PrimitiveByteArrayType.INSTANCE;
+ case BIGINT_UNSIGNED:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+ return new DecimalType(precision, scale);
+ default:
+ throw new
StarRocksConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format("Doesn't support Starrocks type '%s' yet",
starrocksType.getName()));
+ }
+ }
+
+ @SuppressWarnings("MagicNumber")
+ private Map<String, String> buildConnectorOptions(TablePath tablePath) {
+ Map<String, String> options = new HashMap<>(8);
+ options.put("connector", "starrocks");
+ options.put("url", baseUrl + tablePath.getDatabaseName());
+ options.put("table-name", tablePath.getFullName());
+ options.put("username", username);
+ options.put("password", pwd);
+ return options;
+ }
+
+ public void createTable(String sql) throws TableAlreadyExistException,
DatabaseNotExistException, CatalogException {
+ try (Connection conn = DriverManager.getConnection(baseUrl +
getDefaultDatabase(), username, pwd)) {
+ conn.createStatement().execute(sql);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format("Failed listing database in catalog %s",
catalogName), e);
+ }
+ }
+
+ /**
+ * URL has to be without database, like "jdbc:mysql://localhost:5432/" or
+ * "jdbc:mysql://localhost:5432" rather than
"jdbc:mysql://localhost:5432/db".
+ */
+ public static void validateJdbcUrlWithoutDatabase(String url) {
+ String[] parts = url.trim().split("\\/+");
+
+ checkArgument(parts.length == 2);
+ }
+
+ /**
+ * URL has to be with database, like "jdbc:mysql://localhost:5432/db"
rather than "jdbc:mysql://localhost:5432/".
+ */
+ @SuppressWarnings("MagicNumber")
+ public static void validateJdbcUrlWithDatabase(String url) {
+ String[] parts = url.trim().split("\\/+");
+ checkArgument(parts.length == 3);
+ }
+
+ /**
+ * Ensure that the url was validated {@link #validateJdbcUrlWithDatabase}.
+ *
+ * @return The array size is fixed at 2, index 0 is base url, and index 1
is default database.
+ */
+ public static String[] splitDefaultUrl(String defaultUrl) {
+ String[] res = new String[2];
+ int index = defaultUrl.lastIndexOf("/") + 1;
+ res[0] = defaultUrl.substring(0, index);
+ res[1] = defaultUrl.substring(index);
+ return res;
+ }
+
+ @Override
+ public String getDefaultDatabase() {
+ return defaultDatabase;
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ // test connection, fail early if we cannot connect to database
+ conn.getCatalog();
+ } catch (SQLException e) {
+ throw new CatalogException(
+ String.format("Failed connecting to %s via JDBC.",
defaultUrl), e);
+ }
+
+ LOG.info("Catalog {} established connection to {}", catalogName,
defaultUrl);
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ LOG.info("Catalog {} closing", catalogName);
+ }
+
+ protected Optional<PrimaryKey> getPrimaryKey(String schema, String table)
throws SQLException {
+
+ List<String> pkFields = new ArrayList<>();
+ try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ ResultSet rs = conn.createStatement().executeQuery(
+ String.format("SELECT COLUMN_NAME FROM
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION", schema, table));
+ while (rs.next()) {
+ String columnName = rs.getString("COLUMN_NAME");
+ pkFields.add(columnName);
+ }
+ }
+ if (!pkFields.isEmpty()) {
+ // PK_NAME maybe null according to the javadoc, generate a unique
name in that case
+ String pkName = "pk_" + String.join("_", pkFields);
+ return Optional.of(PrimaryKey.of(pkName, pkFields));
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ checkArgument(StringUtils.isNotBlank(databaseName));
+
+ return listDatabases().contains(databaseName);
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ try {
+ return databaseExists(tablePath.getDatabaseName())
+ &&
listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
new file mode 100644
index 000000000..ece895d98
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDataTypeConvertor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import com.mysql.cj.MysqlType;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.Collections;
+import java.util.Map;
+
+@AutoService(DataTypeConvertor.class)
+public class StarRocksDataTypeConvertor implements
DataTypeConvertor<MysqlType> {
+ public static final String PRECISION = "precision";
+ public static final String SCALE = "scale";
+
+ public static final Integer DEFAULT_PRECISION = 10;
+
+ public static final Integer DEFAULT_SCALE = 0;
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ checkNotNull(connectorDataType, "connectorDataType can not be null");
+ MysqlType mysqlType = MysqlType.getByName(connectorDataType);
+ Map<String, Object> dataTypeProperties;
+ switch (mysqlType) {
+ case BIGINT_UNSIGNED:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ // parse precision and scale
+ int left = connectorDataType.indexOf("(");
+ int right = connectorDataType.indexOf(")");
+ int precision = DEFAULT_PRECISION;
+ int scale = DEFAULT_SCALE;
+ if (left != -1 && right != -1) {
+ String[] precisionAndScale =
connectorDataType.substring(left + 1, right).split(",");
+ if (precisionAndScale.length == 2) {
+ precision = Integer.parseInt(precisionAndScale[0]);
+ scale = Integer.parseInt(precisionAndScale[1]);
+ } else if (precisionAndScale.length == 1) {
+ precision = Integer.parseInt(precisionAndScale[0]);
+ }
+ }
+ dataTypeProperties = ImmutableMap.of(PRECISION, precision,
SCALE, scale);
+ break;
+ default:
+ dataTypeProperties = Collections.emptyMap();
+ break;
+ }
+ return toSeaTunnelType(mysqlType, dataTypeProperties);
+ }
+
+ // todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType
doesn't contains properties.
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(MysqlType mysqlType,
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
+ checkNotNull(mysqlType, "mysqlType can not be null");
+
+ switch (mysqlType) {
+ case NULL:
+ return BasicType.VOID_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case BIT:
+ case TINYINT:
+ return BasicType.BYTE_TYPE;
+ case TINYINT_UNSIGNED:
+ case SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case SMALLINT_UNSIGNED:
+ case INT:
+ case MEDIUMINT:
+ case MEDIUMINT_UNSIGNED:
+ return BasicType.INT_TYPE;
+ case INT_UNSIGNED:
+ case BIGINT:
+ return BasicType.LONG_TYPE;
+ case FLOAT:
+ case FLOAT_UNSIGNED:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ case DOUBLE_UNSIGNED:
+ return BasicType.DOUBLE_TYPE;
+ case TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case TIMESTAMP:
+ case DATETIME:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ // TODO: to confirm
+ case CHAR:
+ case VARCHAR:
+ case TINYTEXT:
+ case TEXT:
+ case MEDIUMTEXT:
+ case LONGTEXT:
+ case JSON:
+ case ENUM:
+ return BasicType.STRING_TYPE;
+ case BINARY:
+ case VARBINARY:
+ case TINYBLOB:
+ case BLOB:
+ case MEDIUMBLOB:
+ case LONGBLOB:
+ case GEOMETRY:
+ return PrimitiveByteArrayType.INSTANCE;
+ case BIGINT_UNSIGNED:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ Integer precision = MapUtils.getInteger(dataTypeProperties,
PRECISION, DEFAULT_PRECISION);
+ Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE,
DEFAULT_SCALE);
+ return new DecimalType(precision, scale);
+ // TODO: support 'SET' & 'YEAR' type
+ default:
+ throw
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType);
+ }
+ }
+
+ @Override
+ public MysqlType toConnectorType(SeaTunnelDataType<?> seaTunnelDataType,
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
+ SqlType sqlType = seaTunnelDataType.getSqlType();
+ // todo: verify
+ switch (sqlType) {
+ case ARRAY:
+ case MAP:
+ case ROW:
+ case STRING:
+ return MysqlType.VARCHAR;
+ case BOOLEAN:
+ return MysqlType.BOOLEAN;
+ case TINYINT:
+ return MysqlType.TINYINT;
+ case SMALLINT:
+ return MysqlType.SMALLINT;
+ case INT:
+ return MysqlType.INT;
+ case BIGINT:
+ return MysqlType.BIGINT;
+ case FLOAT:
+ return MysqlType.FLOAT;
+ case DOUBLE:
+ return MysqlType.DOUBLE;
+ case DECIMAL:
+ return MysqlType.DECIMAL;
+ case NULL:
+ return MysqlType.NULL;
+ case BYTES:
+ return MysqlType.BIT;
+ case DATE:
+ return MysqlType.DATE;
+ case TIME:
+ return MysqlType.DATETIME;
+ case TIMESTAMP:
+ return MysqlType.TIMESTAMP;
+ default:
+ throw new
StarRocksConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format("Doesn't support type '%s' yet", sqlType));
+ }
+ }
+
+ @Override
+ public String getIdentity() {
+ return "StarRocks";
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
index f51feff7f..0a12c8c50 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
@@ -41,15 +41,14 @@ public class StarRocksSinkManager {
private final SinkConfig sinkConfig;
private final List<byte[]> batchList;
- private StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
+ private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
private volatile boolean initialize;
private volatile Exception flushException;
private int batchRowCount = 0;
private long batchBytesSize = 0;
-
- private Integer batchIntervalMs;
+ private final Integer batchIntervalMs;
public StarRocksSinkManager(SinkConfig sinkConfig, List<String> fileNames)
{
this.sinkConfig = sinkConfig;
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index 3320eab69..5ba7543c5 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -27,6 +27,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,7 +35,7 @@ import java.util.Map;
@Setter
@Getter
@ToString
-public class SinkConfig {
+public class SinkConfig implements Serializable {
private static final int DEFAULT_BATCH_MAX_SIZE = 1024;
private static final long DEFAULT_BATCH_BYTES = 5 * 1024 * 1024;
@@ -79,6 +80,21 @@ public class SinkConfig {
.withDescription("The parameter of the stream load data_desc. " +
"The way to specify the parameter is to add the original stream
load parameter into map");
+ public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
+ .stringType()
+ .defaultValue("CREATE TABLE IF NOT EXISTS
`${database}`.`${table_name}` (\n" +
+ "${rowtype_fields}\n" +
+ ") ENGINE=OLAP\n" +
+ "DISTRIBUTED BY HASH (${rowtype_primary_key})" +
+ "PROPERTIES (\n" +
+ " \"replication_num\" = \"1\" \n" +
+ ")").withDescription("Create table statement template, used to
create StarRocks table");
+
+ public static final Option<String> QUERY_PORT = Options.key("query_port")
+ .stringType()
+ .defaultValue("9030")
+ .withDescription("FE MySQL server port");
+
public static final Option<Integer> BATCH_MAX_SIZE =
Options.key("batch_max_rows")
.intType()
.defaultValue(DEFAULT_BATCH_MAX_SIZE)
@@ -125,6 +141,7 @@ public class SinkConfig {
}
private List<String> nodeUrls;
+ private String jdbcUrl;
private String username;
private String password;
private String database;
@@ -142,17 +159,22 @@ public class SinkConfig {
private int maxRetryBackoffMs;
private boolean enableUpsertDelete;
+ private String saveModeCreateTemplate =
SAVE_MODE_CREATE_TEMPLATE.defaultValue();
+
private final Map<String, Object> streamLoadProps = new HashMap<>();
public static SinkConfig loadConfig(Config pluginConfig) {
SinkConfig sinkConfig = new SinkConfig();
sinkConfig.setNodeUrls(pluginConfig.getStringList(NODE_URLS.key()));
sinkConfig.setDatabase(pluginConfig.getString(DATABASE.key()));
- sinkConfig.setTable(pluginConfig.getString(TABLE.key()));
-
+ sinkConfig.setJdbcUrl("jdbc:mysql://" +
sinkConfig.getNodeUrls().get(0).split(":")[0] +
+ ":" + pluginConfig.getString(QUERY_PORT.key()) + "/");
if (pluginConfig.hasPath(USERNAME.key())) {
sinkConfig.setUsername(pluginConfig.getString(USERNAME.key()));
}
+ if (pluginConfig.hasPath(TABLE.key())) {
+ sinkConfig.setTable(pluginConfig.getString(TABLE.key()));
+ }
if (pluginConfig.hasPath(PASSWORD.key())) {
sinkConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
}
@@ -180,6 +202,9 @@ public class SinkConfig {
if (pluginConfig.hasPath(ENABLE_UPSERT_DELETE.key())) {
sinkConfig.setEnableUpsertDelete(pluginConfig.getBoolean(ENABLE_UPSERT_DELETE.key()));
}
+ if (pluginConfig.hasPath(SAVE_MODE_CREATE_TEMPLATE.key())) {
+
sinkConfig.setSaveModeCreateTemplate(pluginConfig.getString(SAVE_MODE_CREATE_TEMPLATE.key()));
+ }
parseSinkStreamLoadProperties(pluginConfig, sinkConfig);
if (sinkConfig.streamLoadProps.containsKey(COLUMN_SEPARATOR)) {
sinkConfig.setColumnSeparator((String)
sinkConfig.streamLoadProps.get(COLUMN_SEPARATOR));
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
new file mode 100644
index 000000000..2179e77db
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.sink.SaveModeConstants;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.stream.Collectors;
+
+public class StarRocksSaveModeUtil {
+
+ public static String fillingCreateSql(String template, String database,
String table, TableSchema tableSchema) {
+ String primaryKey =
tableSchema.getPrimaryKey().getColumnNames().stream().map(r -> "`" + r +
"`").collect(Collectors.joining(","));
+ String rowTypeFields =
tableSchema.getColumns().stream().map(StarRocksSaveModeUtil::columnToStarrocksType)
+ .collect(Collectors.joining(",\n"));
+ return template.replace(String.format("${%s}",
SaveModeConstants.DATABASE), database)
+ .replace(String.format("${%s}", SaveModeConstants.TABLE_NAME),
table)
+ .replace(String.format("${%s}", SaveModeConstants.ROWTYPE_FIELDS),
rowTypeFields)
+ .replace(String.format("${%s}",
SaveModeConstants.ROWTYPE_PRIMARY_KEY), primaryKey);
+ }
+
+ static String columnToStarrocksType(Column column) {
+ checkNotNull(column, "The column is required.");
+ return String.format("`%s` %s %s %s", column.getName(),
dataTypeToStarrocksType(column.getDataType()),
+ column.isNullable() ? "NULL" : "NOT NULL",
column.getDefaultValue() == null ? "" : column.getDefaultValue().toString());
+ }
+
+ static String dataTypeToStarrocksType(SeaTunnelDataType<?> dataType) {
+ checkNotNull(dataType, "The SeaTunnel's data type is required.");
+ switch (dataType.getSqlType()) {
+ case NULL:
+ case TIME:
+ throw new IllegalArgumentException("Unsupported SeaTunnel's
data type: " + dataType);
+ case STRING:
+ case BYTES:
+ return "STRING";
+ case BOOLEAN:
+ return "BOOLEAN";
+ case TINYINT:
+ return "TINYINT";
+ case SMALLINT:
+ return "SMALLINT";
+ case INT:
+ return "INT";
+ case BIGINT:
+ return "BIGINT";
+ case FLOAT:
+ return "FLOAT";
+ case DOUBLE:
+ return "DOUBLE";
+ case DATE:
+ return "DATE";
+ case TIMESTAMP:
+ return "DATETIME";
+ case ARRAY:
+ return "ARRAY<" + dataTypeToStarrocksType(((ArrayType<?, ?>)
dataType).getElementType()) + ">";
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) dataType;
+ return String.format("Decimal(%d, %d)",
decimalType.getPrecision(), decimalType.getScale());
+ case MAP:
+ case ROW:
+ return "JSON";
+ default:
+ }
+ throw new IllegalArgumentException("Unsupported SeaTunnel's data type:
" + dataType);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index 16f303742..c62300c46 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -25,8 +25,12 @@ import static
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkCon
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportDataSaveMode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -35,18 +39,24 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
@AutoService(SeaTunnelSink.class)
-public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class StarRocksSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportDataSaveMode {
- private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
-
+ private SinkConfig sinkConfig;
+ private DataSaveMode dataSaveMode;
@Override
public String getPluginName() {
return "StarRocks";
@@ -54,13 +64,33 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
NODE_URLS.key(), DATABASE.key(), TABLE.key(), USERNAME.key(), PASSWORD.key());
if (!result.isSuccess()) {
throw new
StarRocksConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
+ // TODO get catalog Table
+ CatalogTable catalogTable = null;
+ sinkConfig = SinkConfig.loadConfig(pluginConfig);
+ if (StringUtils.isEmpty(sinkConfig.getTable())) {
+ sinkConfig.setTable(catalogTable.getTableId().getTableName());
+ }
+ sinkConfig.setTable(catalogTable.getTableId().getTableName());
+ dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
+ }
+
+ private void autoCreateTable(String template) {
+ StarRocksCatalog starRocksCatalog = new StarRocksCatalog("StarRocks",
sinkConfig.getDatabase(),
+ sinkConfig.getUsername(), sinkConfig.getPassword(),
sinkConfig.getJdbcUrl());
+ if (!starRocksCatalog.databaseExists(sinkConfig.getDatabase())) {
+
starRocksCatalog.createDatabase(TablePath.of(sinkConfig.getDatabase(), ""),
true);
+ }
+ // TODO get catalog Table
+ CatalogTable catalogTable = null;
+ if
(!starRocksCatalog.tableExists(TablePath.of(sinkConfig.getDatabase(),
sinkConfig.getTable()))) {
+
starRocksCatalog.createTable(StarRocksSaveModeUtil.fillingCreateSql(template,
sinkConfig.getDatabase(), sinkConfig.getTable(),
catalogTable.getTableSchema()));
+ }
}
@Override
@@ -75,6 +105,21 @@ public class StarRocksSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
- return new StarRocksSinkWriter(pluginConfig, seaTunnelRowType);
+ return new StarRocksSinkWriter(sinkConfig, seaTunnelRowType);
+ }
+
+ @Override
+ public DataSaveMode getDataSaveMode() {
+ return dataSaveMode;
+ }
+
+ @Override
+ public List<DataSaveMode> supportedDataSaveModeValues() {
+ return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
+ }
+
+ @Override
+ public void handleSaveMode(DataSaveMode saveMode) {
+ autoCreateTable(sinkConfig.getSaveModeCreateTemplate());
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index b04750a94..8863c3115 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -34,10 +34,11 @@ public class StarRocksSinkFactory implements
TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME,
SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.TABLE)
- .optional(SinkConfig.LABEL_PREFIX, SinkConfig.BATCH_MAX_SIZE,
SinkConfig.BATCH_MAX_BYTES,
- SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES,
SinkConfig.MAX_RETRY_BACKOFF_MS,
- SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS,
SinkConfig.STARROCKS_CONFIG, SinkConfig.ENABLE_UPSERT_DELETE)
+ .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME,
SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.QUERY_PORT)
+ .optional(SinkConfig.TABLE, SinkConfig.LABEL_PREFIX,
SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES,
+ SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES,
SinkConfig.MAX_RETRY_BACKOFF_MS,
+ SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS,
SinkConfig.STARROCKS_CONFIG, SinkConfig.ENABLE_UPSERT_DELETE,
+ SinkConfig.SAVE_MODE_CREATE_TEMPLATE)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
index abe6e41cf..52647c34f 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
@@ -29,8 +29,6 @@ import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksIS
import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksJsonSerializer;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksSinkOP;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -46,9 +44,8 @@ public class StarRocksSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
private final StarRocksISerializer serializer;
private final StarRocksSinkManager manager;
- public StarRocksSinkWriter(Config pluginConfig,
+ public StarRocksSinkWriter(SinkConfig sinkConfig,
SeaTunnelRowType seaTunnelRowType) {
- SinkConfig sinkConfig = SinkConfig.loadConfig(pluginConfig);
List<String> fieldNames =
Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList());
if (sinkConfig.isEnableUpsertDelete()) {
fieldNames.add(StarRocksSinkOP.COLUMN_KEY);
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
new file mode 100644
index 000000000..51a430f14
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/StarRocksCatalogTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import
org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+@Disabled("Please Test it in your local environment")
+public class StarRocksCatalogTest {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StarRocksCatalogTest.class);
+
+ @Test
+ public void testCatalog() {
+ StarRocksCatalog catalog = new StarRocksCatalog("starrocks", "",
"root", "123456", "jdbc:mysql://47.108.65.163:9030/");
+ List<String> databases = catalog.listDatabases();
+ LOGGER.info("find databases: " + databases);
+
+ if (!catalog.databaseExists("default")) {
+ catalog.createDatabase(TablePath.of("default", null), true);
+ }
+
+ databases = catalog.listDatabases();
+ LOGGER.info("find databases: " + databases);
+
+ catalog.createTable("CREATE TABLE IF NOT EXISTS `default`.`test` (\n" +
+ "`recruit_date` DATE NOT NULL COMMENT
\"YYYY-MM-DD\",\n" +
+ " `region_num` TINYINT COMMENT \"range [-128,
127]\",\n" +
+ " `num_plate` SMALLINT COMMENT \"range [-32768,
32767] \",\n" +
+ " `tel` INT COMMENT \"range [-2147483648,
2147483647]\",\n" +
+ " `id` BIGINT COMMENT \"range [-2^63 + 1 ~
2^63 - 1]\",\n" +
+ " `password` LARGEINT COMMENT \"range [-2^127 + 1 ~
2^127 - 1]\",\n" +
+ " `name` CHAR(20) NOT NULL COMMENT \"range
char(m),m in (1-255)\",\n" +
+ " `profile` VARCHAR(500) NOT NULL COMMENT \"upper limit
value 1048576 bytes\",\n" +
+ " `hobby` STRING NOT NULL COMMENT \"upper limit
value 65533 bytes\",\n" +
+ " `leave_time` DATETIME COMMENT \"YYYY-MM-DD
HH:MM:SS\",\n" +
+ " `channel` FLOAT COMMENT \"4 bytes\",\n" +
+ " `income` DOUBLE COMMENT \"8 bytes\",\n" +
+ " `account` DECIMAL(12,4) COMMENT \"\",\n" +
+ " `ispass` BOOLEAN COMMENT \"true/false\"\n" +
+ ") ENGINE=OLAP\n" +
+ "DUPLICATE KEY(`recruit_date`, `region_num`)\n" +
+ "PARTITION BY RANGE(`recruit_date`)\n" +
+ "(\n" +
+ " PARTITION p20220311 VALUES [('2022-03-11'),
('2022-03-12')),\n" +
+ " PARTITION p20220312 VALUES [('2022-03-12'),
('2022-03-13')),\n" +
+ " PARTITION p20220313 VALUES [('2022-03-13'),
('2022-03-14')),\n" +
+ " PARTITION p20220314 VALUES [('2022-03-14'),
('2022-03-15')),\n" +
+ " PARTITION p20220315 VALUES [('2022-03-15'),
('2022-03-16'))\n" +
+ ")" +
+ " DISTRIBUTED BY HASH (`id`)\n" +
+ " PROPERTIES (\n" +
+ " \"replication_num\" = \"1\" \n" +
+ " );");
+ CatalogTable table = catalog.getTable(TablePath.of("default", "test"));
+ LOGGER.info("find table: " + table);
+ }
+
+}