This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 399eabcd3f [Feature][Jdbc] Add Jdbc default dialect for all jdbc 
series database without dialect (#8132)
399eabcd3f is described below

commit 399eabcd3feb38cf74bab0193fa11f9c2a80e993
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue Nov 26 19:25:38 2024 +0800

    [Feature][Jdbc] Add Jdbc default dialect for all jdbc series database 
without dialect (#8132)
---
 .../api/table/converter/BasicTypeDefine.java       |   2 +
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  12 ++
 .../jdbc/internal/dialect/DatabaseIdentifier.java  |   1 +
 .../jdbc/internal/dialect/GenericDialect.java      |  85 +++++++++
 .../internal/dialect/GenericDialectFactory.java    |  43 +++++
 .../internal/dialect/GenericTypeConverter.java     | 200 +++++++++++++++++++++
 .../jdbc/internal/dialect/GenericTypeMapper.java   |  39 ++++
 .../jdbc/internal/dialect/JdbcDialectLoader.java   |  23 +--
 .../internal/dialect/JdbcDialectTypeMapper.java    |   3 +
 .../internal/dialect/JdbcDialectLoaderTest.java    |  39 ++++
 .../connector-jdbc-e2e-part-1/pom.xml              |  12 ++
 .../connectors/seatunnel/jdbc/JdbcMariaDBIT.java   | 193 ++++++++++++++++++++
 .../resources/jdbc_mariadb_source_and_sink.conf    |  54 ++++++
 .../jdbc_mariadb_source_using_table_path.conf      |  51 ++++++
 14 files changed, 742 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicTypeDefine.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicTypeDefine.java
index d15529e0a4..e7c3c04110 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicTypeDefine.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicTypeDefine.java
@@ -31,6 +31,8 @@ public class BasicTypeDefine<T> implements Serializable {
     protected String columnType;
     // e.g. `varchar` for MySQL
     protected String dataType;
+    // It's jdbc sql type(java.sql.Types) not SeaTunnel SqlType
+    protected int sqlType;
     protected T nativeType;
     // e.g. `varchar` length is 10
     protected Long length;
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 60e324be4c..1f09fa2168 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -54,6 +54,8 @@
         <iris.jdbc.version>3.0.0</iris.jdbc.version>
         <tikv.version>3.2.0</tikv.version>
         <opengauss.jdbc.version>5.1.0-og</opengauss.jdbc.version>
+        <mariadb.jdbc.version>3.5.1</mariadb.jdbc.version>
+
     </properties>
 
     <dependencyManagement>
@@ -215,6 +217,12 @@
                 <version>${opengauss.jdbc.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>org.mariadb.jdbc</groupId>
+                <artifactId>mariadb-java-client</artifactId>
+                <version>${mariadb.jdbc.version}</version>
+                <scope>provided</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -329,6 +337,10 @@
             <groupId>org.opengauss</groupId>
             <artifactId>opengauss-jdbc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.mariadb.jdbc</groupId>
+            <artifactId>mariadb-java-client</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index e2a32b4f3f..6f442e7672 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
 public class DatabaseIdentifier {
+    public static final String GENERIC = "Generic";
     public static final String DB_2 = "DB2";
     public static final String DAMENG = "Dameng";
     public static final String GBASE_8A = "Gbase8a";
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialect.java
new file mode 100644
index 0000000000..36c93d6546
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialect.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Optional;
+
+@Slf4j
+public class GenericDialect implements JdbcDialect {
+
+    public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+    public GenericDialect() {}
+
+    public GenericDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
+    @Override
+    public String dialectName() {
+        return DatabaseIdentifier.GENERIC;
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new AbstractJdbcRowConverter() {
+            @Override
+            public String converterName() {
+                return DatabaseIdentifier.GENERIC;
+            }
+        };
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new GenericTypeMapper();
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return getFieldIde(identifier, fieldIde);
+    }
+
+    @Override
+    public String quoteDatabaseIdentifier(String identifier) {
+        return identifier;
+    }
+
+    @Override
+    public String tableIdentifier(TablePath tablePath) {
+        return tableIdentifier(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TablePath parse(String tablePath) {
+        return TablePath.of(tablePath, false);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java
new file mode 100644
index 0000000000..f0d32988bf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import com.google.auto.service.AutoService;
+
+import javax.annotation.Nonnull;
+
+/** Factory for {@link GenericDialect}. */
+@AutoService(JdbcDialectFactory.class)
+public class GenericDialectFactory implements JdbcDialectFactory {
+
+    // GenericDialect does not have any special requirements.
+    @Override
+    public boolean acceptsURL(String url) {
+        return true;
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new GenericDialect();
+    }
+
+    @Override
+    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
+        return new GenericDialect(fieldIde);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeConverter.java
new file mode 100644
index 0000000000..4143367d9f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeConverter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Types;
+
+@Slf4j
+@AutoService(TypeConverter.class)
+public class GenericTypeConverter implements TypeConverter<BasicTypeDefine> {
+
+    public static final GenericTypeConverter DEFAULT_INSTANCE = new 
GenericTypeConverter();
+
+    public static final int MAX_PRECISION = 65;
+    public static final int DEFAULT_PRECISION = 38;
+    public static final int MAX_SCALE = MAX_PRECISION - 1;
+    public static final int DEFAULT_SCALE = 18;
+
+    @Override
+    public String identifier() {
+        return DatabaseIdentifier.GENERIC;
+    }
+
+    /**
+     * Convert an external system's type definition to {@link Column}.
+     *
+     * @param typeDefine type define
+     * @return column
+     */
+    @Override
+    public Column convert(BasicTypeDefine typeDefine) {
+        PhysicalColumn.PhysicalColumnBuilder builder =
+                PhysicalColumn.builder()
+                        .name(typeDefine.getName())
+                        .nullable(typeDefine.isNullable())
+                        .defaultValue(typeDefine.getDefaultValue())
+                        .comment(typeDefine.getComment());
+        int sqlType = typeDefine.getSqlType();
+        switch (sqlType) {
+            case Types.NULL:
+                builder.dataType(BasicType.VOID_TYPE);
+                break;
+            case Types.BOOLEAN:
+                builder.dataType(BasicType.BOOLEAN_TYPE);
+                break;
+            case Types.BIT:
+                if (typeDefine.getLength() == null
+                        || typeDefine.getLength() <= 0
+                        || typeDefine.getLength() == 1) {
+                    builder.dataType(BasicType.BOOLEAN_TYPE);
+                } else {
+                    builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                    // BIT(M) -> BYTE(M/8)
+                    long byteLength = typeDefine.getLength() / 8;
+                    byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0;
+                    builder.columnLength(byteLength);
+                }
+                break;
+            case Types.TINYINT:
+                builder.dataType(BasicType.BYTE_TYPE);
+                break;
+            case Types.SMALLINT:
+                builder.dataType(BasicType.SHORT_TYPE);
+                break;
+            case Types.INTEGER:
+                builder.dataType(BasicType.INT_TYPE);
+                break;
+            case Types.BIGINT:
+                builder.dataType(BasicType.LONG_TYPE);
+                break;
+            case Types.REAL:
+            case Types.FLOAT:
+                builder.dataType(BasicType.FLOAT_TYPE);
+                break;
+            case Types.DOUBLE:
+                builder.dataType(BasicType.DOUBLE_TYPE);
+                break;
+            case Types.NUMERIC:
+                DecimalType decimalTypeForNumeric;
+                if (typeDefine.getPrecision() != null && 
typeDefine.getPrecision() > 0) {
+                    decimalTypeForNumeric =
+                            new DecimalType(
+                                    typeDefine.getPrecision().intValue(), 
typeDefine.getScale());
+                } else {
+                    decimalTypeForNumeric = new DecimalType(DEFAULT_PRECISION, 
DEFAULT_SCALE);
+                }
+                builder.dataType(decimalTypeForNumeric);
+                break;
+            case Types.DECIMAL:
+                Preconditions.checkArgument(typeDefine.getPrecision() > 0);
+                DecimalType decimalType;
+                if (typeDefine.getPrecision() > DEFAULT_PRECISION) {
+                    decimalType = new DecimalType(DEFAULT_PRECISION, 
DEFAULT_SCALE);
+                } else {
+                    decimalType =
+                            new DecimalType(
+                                    typeDefine.getPrecision().intValue(),
+                                    typeDefine.getScale() == null
+                                            ? 0
+                                            : 
typeDefine.getScale().intValue());
+                }
+                builder.dataType(decimalType);
+                builder.columnLength(Long.valueOf(decimalType.getPrecision()));
+                builder.scale(decimalType.getScale());
+                break;
+
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+            case Types.NCHAR:
+            case Types.NVARCHAR:
+            case Types.LONGNVARCHAR:
+            case Types.CLOB:
+            case Types.DATALINK:
+            case Types.NCLOB:
+            case Types.SQLXML:
+                builder.dataType(BasicType.STRING_TYPE);
+                break;
+
+            case Types.BINARY:
+            case Types.BLOB:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+                if (typeDefine.getLength() == null || typeDefine.getLength() 
<= 0) {
+                    builder.columnLength(1L);
+                } else {
+                    builder.columnLength(typeDefine.getLength());
+                }
+                builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                break;
+            case Types.DATE:
+                builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);
+                break;
+            case Types.TIME:
+                builder.dataType(LocalTimeType.LOCAL_TIME_TYPE);
+                builder.scale(typeDefine.getScale());
+                break;
+            case Types.TIMESTAMP:
+                builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+                builder.scale(typeDefine.getScale());
+                break;
+
+            case Types.OTHER:
+            case Types.ARRAY:
+            case Types.JAVA_OBJECT:
+            case Types.DISTINCT:
+            case Types.STRUCT:
+            case Types.REF:
+            case Types.ROWID:
+            default:
+                log.warn(
+                        "JDBC type {} ({}) not currently supported",
+                        sqlType,
+                        typeDefine.getNativeType());
+        }
+        return builder.build();
+    }
+
+    /**
+     * Convert {@link Column} to an external system's type definition.
+     *
+     * @param column
+     * @return
+     */
+    @Override
+    public BasicTypeDefine reconvert(Column column) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "%s (%s) type doesn't have a mapping to the SQL 
database column type",
+                        column.getName(), 
column.getDataType().getSqlType().name()));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeMapper.java
new file mode 100644
index 0000000000..bf33c40527
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeMapper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+
+public class GenericTypeMapper implements JdbcDialectTypeMapper {
+
+    private GenericTypeConverter typeConverter;
+
+    public GenericTypeMapper() {
+        this(GenericTypeConverter.DEFAULT_INSTANCE);
+    }
+
+    public GenericTypeMapper(GenericTypeConverter typeConverter) {
+        this.typeConverter = typeConverter;
+    }
+
+    @Override
+    public Column mappingColumn(BasicTypeDefine typeDefine) {
+        return typeConverter.convert(typeDefine);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
index 350a22e20c..7dc71835ae 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
@@ -61,24 +61,17 @@ public final class JdbcDialectLoader {
                             JdbcDialectFactory.class.getName()));
         }
 
-        final List<JdbcDialectFactory> matchingFactories =
+        List<JdbcDialectFactory> matchingFactories =
                 foundFactories.stream().filter(f -> 
f.acceptsURL(url)).collect(Collectors.toList());
 
-        if (matchingFactories.isEmpty()) {
-            throw new JdbcConnectorException(
-                    JdbcConnectorErrorCode.NO_SUITABLE_DIALECT_FACTORY,
-                    String.format(
-                            "Could not find any jdbc dialect factory that can 
handle url '%s' that implements '%s' in the classpath.\n\n"
-                                    + "Available factories are:\n\n"
-                                    + "%s",
-                            url,
-                            JdbcDialectFactory.class.getName(),
-                            foundFactories.stream()
-                                    .map(f -> f.getClass().getName())
-                                    .distinct()
-                                    .sorted()
-                                    .collect(Collectors.joining("\n"))));
+        // filter out generic dialect factory
+        if (matchingFactories.size() > 1) {
+            matchingFactories =
+                    matchingFactories.stream()
+                            .filter(f -> !(f instanceof GenericDialectFactory))
+                            .collect(Collectors.toList());
         }
+
         if (matchingFactories.size() > 1) {
             throw new JdbcConnectorException(
                     JdbcConnectorErrorCode.NO_SUITABLE_DIALECT_FACTORY,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
index 7da3fdb843..0b87f7b0d9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
@@ -68,6 +68,7 @@ public interface JdbcDialectTypeMapper extends Serializable {
                         .name(columnName)
                         .columnType(nativeType)
                         .dataType(nativeType)
+                        .sqlType(metadata.getColumnType(colIndex))
                         .nullable(isNullable == 
ResultSetMetaData.columnNullable)
                         .length((long) precision)
                         .precision((long) precision)
@@ -93,6 +94,7 @@ public interface JdbcDialectTypeMapper extends Serializable {
             while (rs.next()) {
                 String columnName = rs.getString("COLUMN_NAME");
                 String nativeType = rs.getString("TYPE_NAME");
+                int sqlType = rs.getInt("DATA_TYPE");
                 int columnSize = rs.getInt("COLUMN_SIZE");
                 int decimalDigits = rs.getInt("DECIMAL_DIGITS");
                 int nullable = rs.getInt("NULLABLE");
@@ -102,6 +104,7 @@ public interface JdbcDialectTypeMapper extends Serializable 
{
                                 .name(columnName)
                                 .columnType(nativeType)
                                 .dataType(nativeType)
+                                .sqlType(sqlType)
                                 .length((long) columnSize)
                                 .precision((long) columnSize)
                                 .scale(decimalDigits)
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoaderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoaderTest.java
new file mode 100644
index 0000000000..84dd36acfb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoaderTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Test for {@link JdbcDialectLoader} */
+public class JdbcDialectLoaderTest {
+    @Test
+    public void shouldFindGenericDialect() throws Exception {
+        JdbcDialect jdbcDialect = JdbcDialectLoader.load("jdbc:someting:", "");
+        Assertions.assertTrue(jdbcDialect instanceof GenericDialect);
+    }
+
+    @Test
+    public void shouldFindMysqlDialect() throws Exception {
+        JdbcDialect jdbcDialect = 
JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", "");
+        Assertions.assertTrue(jdbcDialect instanceof MysqlDialect);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/pom.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/pom.xml
index db678f7dc8..39d34e6215 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/pom.xml
@@ -59,6 +59,12 @@
             <version>${testcontainer.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mariadb</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <!-- drivers -->
         <dependency>
@@ -91,6 +97,12 @@
             <artifactId>postgresql</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mariadb.jdbc</groupId>
+            <artifactId>mariadb-java-client</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMariaDBIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMariaDBIT.java
new file mode 100644
index 0000000000..143d9c76e0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMariaDBIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MariaDBContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** This class is used to test the Generic dialect with MariaDB. */
+public class JdbcMariaDBIT extends AbstractJdbcIT {
+    private static final String MARIADB_CONTAINER_HOST = "mariadb-e2e";
+    private static final int MARIADB_PORT = 3306;
+    private static final String MARIADB_IMAGE =
+            "mariadb:11.6.2-ubi9"; // Use the appropriate version
+    private static final String MARIADB_DRIVER = "org.mariadb.jdbc.Driver";
+    private static final String MARIADB_URL = "jdbc:mariadb://" + HOST + 
":%s/%s";
+    private static final String MARIADB_DATABASE_NAME = "seatunnel";
+    private static final String MARIADB_USER = "mariadb_user"; // Replace with 
your username
+    private static final String MARIADB_PASSWORD = "mariadb_password"; // 
Replace with your password
+
+    private static final String MARIADB_SOURCE = "source";
+    private static final String MARIADB_SINK = "sink";
+    private static final String CATALOG_DATABASE = "catalog_database";
+
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList(
+                    "/jdbc_mariadb_source_and_sink.conf",
+                    "/jdbc_mariadb_source_using_table_path.conf");
+
+    private static final String CREATE_SQL =
+            "CREATE TABLE IF NOT EXISTS %s\n"
+                    + "(\n"
+                    + "    `c_int`                  INT                  
DEFAULT NULL,\n"
+                    + "    `c_varchar`              varchar(255)         
DEFAULT NULL,\n"
+                    + "    `c_text`                 text                 
DEFAULT NULL,\n"
+                    + "    `c_float`                float                
DEFAULT NULL,\n"
+                    + "    `c_double`               double               
DEFAULT NULL,\n"
+                    + "    `c_date`                 date                 
DEFAULT NULL,\n"
+                    + "    `c_datetime`             datetime             
DEFAULT NULL,\n"
+                    + "    `c_timestamp`            timestamp            
DEFAULT NULL\n"
+                    + ");";
+
+    @Override
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl = String.format(MARIADB_URL, MARIADB_PORT, 
MARIADB_DATABASE_NAME);
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
+
+        String insertSql = insertTable(MARIADB_DATABASE_NAME, MARIADB_SOURCE, 
fieldNames);
+
+        return JdbcCase.builder()
+                .dockerImage(MARIADB_IMAGE)
+                .networkAliases(MARIADB_CONTAINER_HOST)
+                .containerEnv(containerEnv)
+                .driverClass(MARIADB_DRIVER)
+                .host(HOST)
+                .port(MARIADB_PORT)
+                .localPort(MARIADB_PORT)
+                .jdbcTemplate(MARIADB_URL)
+                .jdbcUrl(jdbcUrl)
+                .userName(MARIADB_USER)
+                .password(MARIADB_PASSWORD)
+                .database(MARIADB_DATABASE_NAME)
+                .sourceTable(MARIADB_SOURCE)
+                .sinkTable(MARIADB_SINK)
+                .createSql(CREATE_SQL)
+                .configFile(CONFIG_FILE)
+                .insertSql(insertSql)
+                .testData(testDataSet)
+                .catalogDatabase(CATALOG_DATABASE)
+                .catalogTable(MARIADB_SINK)
+                .tablePathFullName(MARIADB_DATABASE_NAME + "." + 
MARIADB_SOURCE)
+                .build();
+    }
+
+    @Override
+    protected void checkResult(
+            String executeKey, TestContainer container, Container.ExecResult 
execResult) {
+        String[] fieldNames =
+                new String[] {
+                    "c_int",
+                    "c_varchar",
+                    "c_text",
+                    "c_float",
+                    "c_double",
+                    "c_date",
+                    "c_datetime",
+                    "c_timestamp"
+                };
+        defaultCompare(executeKey, fieldNames, "c_int");
+    }
+
+    @Override
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/org/mariadb/jdbc/mariadb-java-client/3.5.1/mariadb-java-client-3.5.1.jar";;
 // Use the appropriate version
+    }
+
+    @Override
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        String[] fieldNames =
+                new String[] {
+                    "c_int",
+                    "c_varchar",
+                    "c_text",
+                    "c_float",
+                    "c_double",
+                    "c_date",
+                    "c_datetime",
+                    "c_timestamp"
+                };
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            String varcharValue = String.format("varchar_value_%d", i);
+            String textValue = String.format("text_value_%d", i);
+            float floatValue = 1.1f;
+            double doubleValue = 1.1;
+            LocalDate localDate = LocalDate.now();
+            LocalDateTime localDateTime = LocalDateTime.now();
+
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                i, // int
+                                varcharValue, // varchar
+                                textValue, // text
+                                floatValue, // float
+                                doubleValue, // double
+                                Date.valueOf(localDate), // date
+                                Timestamp.valueOf(localDateTime), // datetime
+                                new Timestamp(System.currentTimeMillis()) // 
timestamp
+                            });
+            rows.add(row);
+        }
+
+        return Pair.of(fieldNames, rows);
+    }
+
+    @Override
+    GenericContainer<?> initContainer() {
+        DockerImageName imageName = DockerImageName.parse(MARIADB_IMAGE);
+        GenericContainer<?> container =
+                new MariaDBContainer(imageName)
+                        .withUsername(MARIADB_USER)
+                        .withPassword(MARIADB_PASSWORD)
+                        .withDatabaseName(MARIADB_DATABASE_NAME)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MARIADB_CONTAINER_HOST)
+                        .withExposedPorts(MARIADB_PORT)
+                        .waitingFor(Wait.forHealthcheck())
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(MARIADB_IMAGE)));
+        container.setPortBindings(
+                Lists.newArrayList(String.format("%d:%d", MARIADB_PORT, 
MARIADB_PORT)));
+        return container;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_and_sink.conf
new file mode 100644
index 0000000000..c2ac75159c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_and_sink.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  jdbc {
+    url = "jdbc:mariadb://mariadb-e2e:3306/seatunnel"
+    driver = "org.mariadb.jdbc.Driver"
+    connection_check_timeout_sec = 100
+    user = "mariadb_user"
+    password = "mariadb_password"
+
+    query = "select * from source;"
+     properties {
+       useSSL=false
+       rewriteBatchedStatements=true
+     }
+  }
+}
+
+transform {
+}
+
+sink {
+  jdbc {
+    url = "jdbc:mariadb://mariadb-e2e:3306/seatunnel"
+    driver = "org.mariadb.jdbc.Driver"
+    user = "mariadb_user"
+    password = "mariadb_password"
+    query = """insert into sink 
(c_int,c_varchar,c_text,c_float,c_double,c_date,c_datetime,c_timestamp) values 
(?, ?, ?, ?, ?, ?, ?, ?);"""
+    properties {
+     useSSL=false
+     rewriteBatchedStatements=true
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_using_table_path.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_using_table_path.conf
new file mode 100644
index 0000000000..a2fa37c39b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_using_table_path.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  jdbc {
+    url = "jdbc:mariadb://mariadb-e2e:3306/seatunnel?useSSL=false"
+    driver = "org.mariadb.jdbc.Driver"
+    connection_check_timeout_sec = 100
+    user = "mariadb_user"
+    password = "mariadb_password"
+
+    table_path = "seatunnel.source"
+    split.size = 8096
+    split.even-distribution.factor.upper-bound = 100
+    split.even-distribution.factor.lower-bound = 0.05
+    split.sample-sharding.threshold = 1000
+    split.inverse-sampling.rate = 1000
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:mariadb://mariadb-e2e:3306/seatunnel?useSSL=false"
+    driver = "org.mariadb.jdbc.Driver"
+    user = "mariadb_user"
+    password = "mariadb_password"
+
+    database = "seatunnel"
+    table = "sink"
+    generate_sink_sql = true
+  }
+}
\ No newline at end of file


Reply via email to