This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new b5140f598e [Improvement] add starrocks jdbc dialect (#7294) b5140f598e is described below commit b5140f598ee8f02ba7e6d25428c56a5904aa01ad Author: Jarvis <liunaijie1...@163.com> AuthorDate: Wed Aug 14 22:33:57 2024 +0800 [Improvement] add starrocks jdbc dialect (#7294) --- docs/en/connector-v2/sink/Jdbc.md | 4 +- docs/en/connector-v2/source/Jdbc.md | 2 +- docs/zh/connector-v2/sink/Jdbc.md | 4 +- .../jdbc/internal/dialect/DatabaseIdentifier.java | 1 + .../dialect/mysql/MySqlDialectFactory.java | 5 +++ .../StarRocksDialect.java} | 26 ++++++------- .../seatunnel/jdbc/JdbcStarRocksdbIT.java | 5 ++- .../src/test/resources/jdbc_starrocks_dialect.conf | 44 ++++++++++++++++++++++ 8 files changed, 71 insertions(+), 20 deletions(-) diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index c46933b486..8ec58506b4 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -82,7 +82,9 @@ Use this sql write upstream input datas to database. e.g `INSERT ...` ### compatible_mode [string] -The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'. +The compatible mode of database, required when the database supports multiple compatible modes. + +For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'. when using StarRocks, you need set it to `starrocks`. Postgres 9.5 version or below,please set it to `postgresLow` to support cdc diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 7fab8d50b2..1b9acc025b 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -46,7 +46,7 @@ supports query SQL and can achieve projection effect. | user | String | No | - | userName [...] | password | String | No | - | password [...] | query | String | No | - | Query statement [...] -| compatible_mode | String | No | - | The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'. [...] +| compatible_mode | String | No | - | The compatible mode of database, required when the database supports multiple compatible modes.<br/> For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'. <br/> when using starrocks, you need set it to `starrocks` [...] | connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. [...] | partition_column | String | No | - | The column name for split data. [...] | partition_upper_bound | Long | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. [...] diff --git a/docs/zh/connector-v2/sink/Jdbc.md b/docs/zh/connector-v2/sink/Jdbc.md index d61292cb92..0cc8605a37 100644 --- a/docs/zh/connector-v2/sink/Jdbc.md +++ b/docs/zh/connector-v2/sink/Jdbc.md @@ -79,7 +79,9 @@ JDBC 连接的 URL。参考案例:`jdbc:postgresql://localhost/test` ### compatible_mode [string] -数据库的兼容模式,当数据库支持多种兼容模式时需要。例如,使用 OceanBase 数据库时,需要将其设置为 'mysql' 或 'oracle' 。 +数据库的兼容模式,当数据库支持多种兼容模式时需要。 + +例如,使用 OceanBase 数据库时,需要将其设置为 'mysql' 或 'oracle' 。使用StarRocks时,需要将其设置为`starrocks`。 Postgres 9.5及以下版本,请设置为 `postgresLow` 来支持 CDC diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java index bf00298a74..45f849c28b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java @@ -25,6 +25,7 @@ public class DatabaseIdentifier { public static final String INFORMIX = "Informix"; public static final String KINGBASE = "KingBase"; public static final String MYSQL = "MySQL"; + public static final String STARROCKS = "StarRocks"; public static final String ORACLE = "Oracle"; public static final String PHOENIX = "Phoenix"; public static final String POSTGRESQL = "Postgres"; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java index a4f89a4dc8..f8278a60cc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java @@ -17,8 +17,10 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.starrocks.StarRocksDialect; import com.google.auto.service.AutoService; @@ -39,6 +41,9 @@ public class MySqlDialectFactory implements JdbcDialectFactory { @Override public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { + if (DatabaseIdentifier.STARROCKS.equalsIgnoreCase(compatibleMode)) { + return new StarRocksDialect(fieldIde); + } return new MysqlDialect(fieldIde); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/starrocks/StarRocksDialect.java similarity index 64% copy from seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java copy to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/starrocks/StarRocksDialect.java index a4f89a4dc8..d7ee796527 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/starrocks/StarRocksDialect.java @@ -15,30 +15,26 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql; +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.starrocks; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; -import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect; -import com.google.auto.service.AutoService; +public class StarRocksDialect extends MysqlDialect { -import javax.annotation.Nonnull; + public StarRocksDialect() {} -/** Factory for {@link MysqlDialect}. */ -@AutoService(JdbcDialectFactory.class) -public class MySqlDialectFactory implements JdbcDialectFactory { - @Override - public boolean acceptsURL(String url) { - return url.startsWith("jdbc:mysql:"); + public StarRocksDialect(String fieldIde) { + this.fieldIde = fieldIde; } @Override - public JdbcDialect create() { - return new MysqlDialect(); + public String dialectName() { + return DatabaseIdentifier.STARROCKS; } @Override - public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { - return new MysqlDialect(fieldIde); + public String hashModForField(String fieldName, int mod) { + return "ABS(md5sum_numeric(" + quoteIdentifier(fieldName) + ") % " + mod + ")"; } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java index e7fc94e642..1d41c480c3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java @@ -37,7 +37,7 @@ import java.util.List; public class JdbcStarRocksdbIT extends AbstractJdbcIT { - private static final String DOCKER_IMAGE = "d87904488/starrocks-starter:2.2.1"; + private static final String DOCKER_IMAGE = "starrocks/allin1-ubuntu:2.5.12"; private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; private static final String NETWORK_ALIASES = "e2e_starRocksdb"; private static final int SR_PORT = 9030; @@ -51,7 +51,8 @@ public class JdbcStarRocksdbIT extends AbstractJdbcIT { private static final String SINK_TABLE = "e2e_table_sink"; private static final List<String> CONFIG_FILE = - Lists.newArrayList("/jdbc_starrocks_source_to_sink.conf"); + Lists.newArrayList( + "/jdbc_starrocks_source_to_sink.conf", "/jdbc_starrocks_dialect.conf"); private static final String CREATE_SQL = "create table %s (\n" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf new file mode 100644 index 0000000000..69fe5538f5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar" +} + +source { + Jdbc { + driver = com.mysql.cj.jdbc.Driver + url = "jdbc:mysql://e2e_starRocksdb:9030" + user = root + password = "" + query = "select BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL from `test`.`e2e_table_source`" + partition_column = "STRING_COL" + compatible_mode = "starrocks" + } +} + +sink { + Jdbc { + driver = com.mysql.cj.jdbc.Driver + url = "jdbc:mysql://e2e_starRocksdb:9030" + user = root + password = "" + query = "INSERT INTO `test`.`e2e_table_sink` (BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + } +}