This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b5140f598e [Improvement] add starrocks jdbc dialect (#7294)
b5140f598e is described below

commit b5140f598ee8f02ba7e6d25428c56a5904aa01ad
Author: Jarvis <liunaijie1...@163.com>
AuthorDate: Wed Aug 14 22:33:57 2024 +0800

    [Improvement] add starrocks jdbc dialect (#7294)
---
 docs/en/connector-v2/sink/Jdbc.md                  |  4 +-
 docs/en/connector-v2/source/Jdbc.md                |  2 +-
 docs/zh/connector-v2/sink/Jdbc.md                  |  4 +-
 .../jdbc/internal/dialect/DatabaseIdentifier.java  |  1 +
 .../dialect/mysql/MySqlDialectFactory.java         |  5 +++
 .../StarRocksDialect.java}                         | 26 ++++++-------
 .../seatunnel/jdbc/JdbcStarRocksdbIT.java          |  5 ++-
 .../src/test/resources/jdbc_starrocks_dialect.conf | 44 ++++++++++++++++++++++
 8 files changed, 71 insertions(+), 20 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index c46933b486..8ec58506b4 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -82,7 +82,9 @@ Use this sql write upstream input datas to database. e.g 
`INSERT ...`
 
 ### compatible_mode [string]
 
-The compatible mode of database, required when the database supports multiple 
compatible modes. For example, when using OceanBase database, you need to set 
it to 'mysql' or 'oracle'.
+The compatible mode of database, required when the database supports multiple 
compatible modes.
+
+For example, when using OceanBase database, you need to set it to 'mysql' or 
'oracle'. when using StarRocks, you need set it to `starrocks`.
 
 Postgres 9.5 version or below,please set it to `postgresLow` to support cdc
 
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 7fab8d50b2..1b9acc025b 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -46,7 +46,7 @@ supports query SQL and can achieve projection effect.
 | user                                       | String  | No       | -          
     | userName                                                                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | password                                   | String  | No       | -          
     | password                                                                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | query                                      | String  | No       | -          
     | Query statement                                                          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| compatible_mode                            | String  | No       | -          
     | The compatible mode of database, required when the database supports 
multiple compatible modes. For example, when using OceanBase database, you need 
to set it to 'mysql' or 'oracle'.                                               
                                                                                
                                                                                
                  [...]
+| compatible_mode                            | String  | No       | -          
     | The compatible mode of database, required when the database supports 
multiple compatible modes.<br/> For example, when using OceanBase database, you 
need to set it to 'mysql' or 'oracle'. <br/> when using starrocks, you need set 
it to `starrocks`                                                               
                                                                                
                  [...]
 | connection_check_timeout_sec               | Int     | No       | 30         
     | The time in seconds to wait for the database operation used to validate 
the connection to complete.                                                     
                                                                                
                                                                                
                                                                                
               [...]
 | partition_column                           | String  | No       | -          
     | The column name for split data.                                          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | partition_upper_bound                      | Long    | No       | -          
     | The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.                                                         
                                                                                
                                                                                
                                                                                
              [...]
diff --git a/docs/zh/connector-v2/sink/Jdbc.md 
b/docs/zh/connector-v2/sink/Jdbc.md
index d61292cb92..0cc8605a37 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -79,7 +79,9 @@ JDBC 连接的 URL。参考案例:`jdbc:postgresql://localhost/test`
 
 ### compatible_mode [string]
 
-数据库的兼容模式,当数据库支持多种兼容模式时需要。例如,使用 OceanBase 数据库时,需要将其设置为 'mysql' 或 'oracle' 。
+数据库的兼容模式,当数据库支持多种兼容模式时需要。
+
+例如,使用 OceanBase 数据库时,需要将其设置为 'mysql' 或 'oracle' 
。使用StarRocks时,需要将其设置为`starrocks`。
 
 Postgres 9.5及以下版本,请设置为 `postgresLow` 来支持 CDC
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index bf00298a74..45f849c28b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -25,6 +25,7 @@ public class DatabaseIdentifier {
     public static final String INFORMIX = "Informix";
     public static final String KINGBASE = "KingBase";
     public static final String MYSQL = "MySQL";
+    public static final String STARROCKS = "StarRocks";
     public static final String ORACLE = "Oracle";
     public static final String PHOENIX = "Phoenix";
     public static final String POSTGRESQL = "Postgres";
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
index a4f89a4dc8..f8278a60cc 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
@@ -17,8 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.starrocks.StarRocksDialect;
 
 import com.google.auto.service.AutoService;
 
@@ -39,6 +41,9 @@ public class MySqlDialectFactory implements 
JdbcDialectFactory {
 
     @Override
     public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
+        if (DatabaseIdentifier.STARROCKS.equalsIgnoreCase(compatibleMode)) {
+            return new StarRocksDialect(fieldIde);
+        }
         return new MysqlDialect(fieldIde);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/starrocks/StarRocksDialect.java
similarity index 64%
copy from 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
copy to 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/starrocks/StarRocksDialect.java
index a4f89a4dc8..d7ee796527 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/starrocks/StarRocksDialect.java
@@ -15,30 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.starrocks;
 
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
 
-import com.google.auto.service.AutoService;
+public class StarRocksDialect extends MysqlDialect {
 
-import javax.annotation.Nonnull;
+    public StarRocksDialect() {}
 
-/** Factory for {@link MysqlDialect}. */
-@AutoService(JdbcDialectFactory.class)
-public class MySqlDialectFactory implements JdbcDialectFactory {
-    @Override
-    public boolean acceptsURL(String url) {
-        return url.startsWith("jdbc:mysql:");
+    public StarRocksDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
     }
 
     @Override
-    public JdbcDialect create() {
-        return new MysqlDialect();
+    public String dialectName() {
+        return DatabaseIdentifier.STARROCKS;
     }
 
     @Override
-    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
-        return new MysqlDialect(fieldIde);
+    public String hashModForField(String fieldName, int mod) {
+        return "ABS(md5sum_numeric(" + quoteIdentifier(fieldName) + ") % " + 
mod + ")";
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
index e7fc94e642..1d41c480c3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
@@ -37,7 +37,7 @@ import java.util.List;
 
 public class JdbcStarRocksdbIT extends AbstractJdbcIT {
 
-    private static final String DOCKER_IMAGE = 
"d87904488/starrocks-starter:2.2.1";
+    private static final String DOCKER_IMAGE = 
"starrocks/allin1-ubuntu:2.5.12";
     private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
     private static final String NETWORK_ALIASES = "e2e_starRocksdb";
     private static final int SR_PORT = 9030;
@@ -51,7 +51,8 @@ public class JdbcStarRocksdbIT extends AbstractJdbcIT {
     private static final String SINK_TABLE = "e2e_table_sink";
 
     private static final List<String> CONFIG_FILE =
-            Lists.newArrayList("/jdbc_starrocks_source_to_sink.conf");
+            Lists.newArrayList(
+                    "/jdbc_starrocks_source_to_sink.conf", 
"/jdbc_starrocks_dialect.conf");
 
     private static final String CREATE_SQL =
             "create table %s (\n"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
new file mode 100644
index 0000000000..69fe5538f5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_starrocks_dialect.conf
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  jars = "file:///tmp/jars/mysql-connector-java-8.0.16.jar"
+}
+
+source {
+  Jdbc {
+    driver = com.mysql.cj.jdbc.Driver
+    url = "jdbc:mysql://e2e_starRocksdb:9030"
+    user = root
+    password = ""
+    query = "select BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, 
BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, 
VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL from 
`test`.`e2e_table_source`"
+    partition_column = "STRING_COL"
+    compatible_mode = "starrocks"
+  }
+}
+
+sink {
+  Jdbc {
+    driver = com.mysql.cj.jdbc.Driver
+    url = "jdbc:mysql://e2e_starRocksdb:9030"
+    user = root
+    password = ""
+    query = "INSERT INTO `test`.`e2e_table_sink` (BIGINT_COL, LARGEINT_COL, 
SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, 
INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL) 
VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+  }
+}

Reply via email to