This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 399eabcd3f [Feature][Jdbc] Add Jdbc default dialect for all jdbc
series database without dialect (#8132)
399eabcd3f is described below
commit 399eabcd3feb38cf74bab0193fa11f9c2a80e993
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue Nov 26 19:25:38 2024 +0800
[Feature][Jdbc] Add Jdbc default dialect for all jdbc series database
without dialect (#8132)
---
.../api/table/converter/BasicTypeDefine.java | 2 +
seatunnel-connectors-v2/connector-jdbc/pom.xml | 12 ++
.../jdbc/internal/dialect/DatabaseIdentifier.java | 1 +
.../jdbc/internal/dialect/GenericDialect.java | 85 +++++++++
.../internal/dialect/GenericDialectFactory.java | 43 +++++
.../internal/dialect/GenericTypeConverter.java | 200 +++++++++++++++++++++
.../jdbc/internal/dialect/GenericTypeMapper.java | 39 ++++
.../jdbc/internal/dialect/JdbcDialectLoader.java | 23 +--
.../internal/dialect/JdbcDialectTypeMapper.java | 3 +
.../internal/dialect/JdbcDialectLoaderTest.java | 39 ++++
.../connector-jdbc-e2e-part-1/pom.xml | 12 ++
.../connectors/seatunnel/jdbc/JdbcMariaDBIT.java | 193 ++++++++++++++++++++
.../resources/jdbc_mariadb_source_and_sink.conf | 54 ++++++
.../jdbc_mariadb_source_using_table_path.conf | 51 ++++++
14 files changed, 742 insertions(+), 15 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicTypeDefine.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicTypeDefine.java
index d15529e0a4..e7c3c04110 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicTypeDefine.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/converter/BasicTypeDefine.java
@@ -31,6 +31,8 @@ public class BasicTypeDefine<T> implements Serializable {
protected String columnType;
// e.g. `varchar` for MySQL
protected String dataType;
+ // It's jdbc sql type(java.sql.Types) not SeaTunnel SqlType
+ protected int sqlType;
protected T nativeType;
// e.g. `varchar` length is 10
protected Long length;
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 60e324be4c..1f09fa2168 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -54,6 +54,8 @@
<iris.jdbc.version>3.0.0</iris.jdbc.version>
<tikv.version>3.2.0</tikv.version>
<opengauss.jdbc.version>5.1.0-og</opengauss.jdbc.version>
+ <mariadb.jdbc.version>3.5.1</mariadb.jdbc.version>
+
</properties>
<dependencyManagement>
@@ -215,6 +217,12 @@
<version>${opengauss.jdbc.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <version>${mariadb.jdbc.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
@@ -329,6 +337,10 @@
<groupId>org.opengauss</groupId>
<artifactId>opengauss-jdbc</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index e2a32b4f3f..6f442e7672 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
public class DatabaseIdentifier {
+ public static final String GENERIC = "Generic";
public static final String DB_2 = "DB2";
public static final String DAMENG = "Dameng";
public static final String GBASE_8A = "Gbase8a";
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialect.java
new file mode 100644
index 0000000000..36c93d6546
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialect.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Optional;
+
+@Slf4j
+public class GenericDialect implements JdbcDialect {
+
+ public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+ public GenericDialect() {}
+
+ public GenericDialect(String fieldIde) {
+ this.fieldIde = fieldIde;
+ }
+
+ @Override
+ public String dialectName() {
+ return DatabaseIdentifier.GENERIC;
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new AbstractJdbcRowConverter() {
+ @Override
+ public String converterName() {
+ return DatabaseIdentifier.GENERIC;
+ }
+ };
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new GenericTypeMapper();
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return getFieldIde(identifier, fieldIde);
+ }
+
+ @Override
+ public String quoteDatabaseIdentifier(String identifier) {
+ return identifier;
+ }
+
+ @Override
+ public String tableIdentifier(TablePath tablePath) {
+ return tableIdentifier(tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
+ @Override
+ public Optional<String> getUpsertStatement(
+ String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TablePath parse(String tablePath) {
+ return TablePath.of(tablePath, false);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java
new file mode 100644
index 0000000000..f0d32988bf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import com.google.auto.service.AutoService;
+
+import javax.annotation.Nonnull;
+
+/** Factory for {@link GenericDialect}. */
+@AutoService(JdbcDialectFactory.class)
+public class GenericDialectFactory implements JdbcDialectFactory {
+
+ // GenericDialect does not have any special requirements.
+ @Override
+ public boolean acceptsURL(String url) {
+ return true;
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new GenericDialect();
+ }
+
+ @Override
+ public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
+ return new GenericDialect(fieldIde);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeConverter.java
new file mode 100644
index 0000000000..4143367d9f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeConverter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Types;
+
+@Slf4j
+@AutoService(TypeConverter.class)
+public class GenericTypeConverter implements TypeConverter<BasicTypeDefine> {
+
+ public static final GenericTypeConverter DEFAULT_INSTANCE = new
GenericTypeConverter();
+
+ public static final int MAX_PRECISION = 65;
+ public static final int DEFAULT_PRECISION = 38;
+ public static final int MAX_SCALE = MAX_PRECISION - 1;
+ public static final int DEFAULT_SCALE = 18;
+
+ @Override
+ public String identifier() {
+ return DatabaseIdentifier.GENERIC;
+ }
+
+ /**
+ * Convert an external system's type definition to {@link Column}.
+ *
+ * @param typeDefine type define
+ * @return column
+ */
+ @Override
+ public Column convert(BasicTypeDefine typeDefine) {
+ PhysicalColumn.PhysicalColumnBuilder builder =
+ PhysicalColumn.builder()
+ .name(typeDefine.getName())
+ .nullable(typeDefine.isNullable())
+ .defaultValue(typeDefine.getDefaultValue())
+ .comment(typeDefine.getComment());
+ int sqlType = typeDefine.getSqlType();
+ switch (sqlType) {
+ case Types.NULL:
+ builder.dataType(BasicType.VOID_TYPE);
+ break;
+ case Types.BOOLEAN:
+ builder.dataType(BasicType.BOOLEAN_TYPE);
+ break;
+ case Types.BIT:
+ if (typeDefine.getLength() == null
+ || typeDefine.getLength() <= 0
+ || typeDefine.getLength() == 1) {
+ builder.dataType(BasicType.BOOLEAN_TYPE);
+ } else {
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ // BIT(M) -> BYTE(M/8)
+ long byteLength = typeDefine.getLength() / 8;
+ byteLength += typeDefine.getLength() % 8 > 0 ? 1 : 0;
+ builder.columnLength(byteLength);
+ }
+ break;
+ case Types.TINYINT:
+ builder.dataType(BasicType.BYTE_TYPE);
+ break;
+ case Types.SMALLINT:
+ builder.dataType(BasicType.SHORT_TYPE);
+ break;
+ case Types.INTEGER:
+ builder.dataType(BasicType.INT_TYPE);
+ break;
+ case Types.BIGINT:
+ builder.dataType(BasicType.LONG_TYPE);
+ break;
+ case Types.REAL:
+ case Types.FLOAT:
+ builder.dataType(BasicType.FLOAT_TYPE);
+ break;
+ case Types.DOUBLE:
+ builder.dataType(BasicType.DOUBLE_TYPE);
+ break;
+ case Types.NUMERIC:
+ DecimalType decimalTypeForNumeric;
+ if (typeDefine.getPrecision() != null &&
typeDefine.getPrecision() > 0) {
+ decimalTypeForNumeric =
+ new DecimalType(
+ typeDefine.getPrecision().intValue(),
typeDefine.getScale());
+ } else {
+ decimalTypeForNumeric = new DecimalType(DEFAULT_PRECISION,
DEFAULT_SCALE);
+ }
+ builder.dataType(decimalTypeForNumeric);
+ break;
+ case Types.DECIMAL:
+ Preconditions.checkArgument(typeDefine.getPrecision() > 0);
+ DecimalType decimalType;
+ if (typeDefine.getPrecision() > DEFAULT_PRECISION) {
+ decimalType = new DecimalType(DEFAULT_PRECISION,
DEFAULT_SCALE);
+ } else {
+ decimalType =
+ new DecimalType(
+ typeDefine.getPrecision().intValue(),
+ typeDefine.getScale() == null
+ ? 0
+ :
typeDefine.getScale().intValue());
+ }
+ builder.dataType(decimalType);
+ builder.columnLength(Long.valueOf(decimalType.getPrecision()));
+ builder.scale(decimalType.getScale());
+ break;
+
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NCHAR:
+ case Types.NVARCHAR:
+ case Types.LONGNVARCHAR:
+ case Types.CLOB:
+ case Types.DATALINK:
+ case Types.NCLOB:
+ case Types.SQLXML:
+ builder.dataType(BasicType.STRING_TYPE);
+ break;
+
+ case Types.BINARY:
+ case Types.BLOB:
+ case Types.VARBINARY:
+ case Types.LONGVARBINARY:
+ if (typeDefine.getLength() == null || typeDefine.getLength()
<= 0) {
+ builder.columnLength(1L);
+ } else {
+ builder.columnLength(typeDefine.getLength());
+ }
+ builder.dataType(PrimitiveByteArrayType.INSTANCE);
+ break;
+ case Types.DATE:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);
+ break;
+ case Types.TIME:
+ builder.dataType(LocalTimeType.LOCAL_TIME_TYPE);
+ builder.scale(typeDefine.getScale());
+ break;
+ case Types.TIMESTAMP:
+ builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+ builder.scale(typeDefine.getScale());
+ break;
+
+ case Types.OTHER:
+ case Types.ARRAY:
+ case Types.JAVA_OBJECT:
+ case Types.DISTINCT:
+ case Types.STRUCT:
+ case Types.REF:
+ case Types.ROWID:
+ default:
+ log.warn(
+ "JDBC type {} ({}) not currently supported",
+ sqlType,
+ typeDefine.getNativeType());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Convert {@link Column} to an external system's type definition.
+ *
+ * @param column
+ * @return
+ */
+ @Override
+ public BasicTypeDefine reconvert(Column column) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s (%s) type doesn't have a mapping to the SQL
database column type",
+ column.getName(),
column.getDataType().getSqlType().name()));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeMapper.java
new file mode 100644
index 0000000000..bf33c40527
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericTypeMapper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+
+public class GenericTypeMapper implements JdbcDialectTypeMapper {
+
+ private GenericTypeConverter typeConverter;
+
+ public GenericTypeMapper() {
+ this(GenericTypeConverter.DEFAULT_INSTANCE);
+ }
+
+ public GenericTypeMapper(GenericTypeConverter typeConverter) {
+ this.typeConverter = typeConverter;
+ }
+
+ @Override
+ public Column mappingColumn(BasicTypeDefine typeDefine) {
+ return typeConverter.convert(typeDefine);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
index 350a22e20c..7dc71835ae 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
@@ -61,24 +61,17 @@ public final class JdbcDialectLoader {
JdbcDialectFactory.class.getName()));
}
- final List<JdbcDialectFactory> matchingFactories =
+ List<JdbcDialectFactory> matchingFactories =
foundFactories.stream().filter(f ->
f.acceptsURL(url)).collect(Collectors.toList());
- if (matchingFactories.isEmpty()) {
- throw new JdbcConnectorException(
- JdbcConnectorErrorCode.NO_SUITABLE_DIALECT_FACTORY,
- String.format(
- "Could not find any jdbc dialect factory that can
handle url '%s' that implements '%s' in the classpath.\n\n"
- + "Available factories are:\n\n"
- + "%s",
- url,
- JdbcDialectFactory.class.getName(),
- foundFactories.stream()
- .map(f -> f.getClass().getName())
- .distinct()
- .sorted()
- .collect(Collectors.joining("\n"))));
+ // filter out generic dialect factory
+ if (matchingFactories.size() > 1) {
+ matchingFactories =
+ matchingFactories.stream()
+ .filter(f -> !(f instanceof GenericDialectFactory))
+ .collect(Collectors.toList());
}
+
if (matchingFactories.size() > 1) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.NO_SUITABLE_DIALECT_FACTORY,
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
index 7da3fdb843..0b87f7b0d9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectTypeMapper.java
@@ -68,6 +68,7 @@ public interface JdbcDialectTypeMapper extends Serializable {
.name(columnName)
.columnType(nativeType)
.dataType(nativeType)
+ .sqlType(metadata.getColumnType(colIndex))
.nullable(isNullable ==
ResultSetMetaData.columnNullable)
.length((long) precision)
.precision((long) precision)
@@ -93,6 +94,7 @@ public interface JdbcDialectTypeMapper extends Serializable {
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
String nativeType = rs.getString("TYPE_NAME");
+ int sqlType = rs.getInt("DATA_TYPE");
int columnSize = rs.getInt("COLUMN_SIZE");
int decimalDigits = rs.getInt("DECIMAL_DIGITS");
int nullable = rs.getInt("NULLABLE");
@@ -102,6 +104,7 @@ public interface JdbcDialectTypeMapper extends Serializable
{
.name(columnName)
.columnType(nativeType)
.dataType(nativeType)
+ .sqlType(sqlType)
.length((long) columnSize)
.precision((long) columnSize)
.scale(decimalDigits)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoaderTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoaderTest.java
new file mode 100644
index 0000000000..84dd36acfb
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoaderTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Test for {@link JdbcDialectLoader} */
+public class JdbcDialectLoaderTest {
+ @Test
+ public void shouldFindGenericDialect() throws Exception {
+ JdbcDialect jdbcDialect = JdbcDialectLoader.load("jdbc:someting:", "");
+ Assertions.assertTrue(jdbcDialect instanceof GenericDialect);
+ }
+
+ @Test
+ public void shouldFindMysqlDialect() throws Exception {
+ JdbcDialect jdbcDialect =
JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", "");
+ Assertions.assertTrue(jdbcDialect instanceof MysqlDialect);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/pom.xml
index db678f7dc8..39d34e6215 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/pom.xml
@@ -59,6 +59,12 @@
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mariadb</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
<!-- drivers -->
<dependency>
@@ -91,6 +97,12 @@
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMariaDBIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMariaDBIT.java
new file mode 100644
index 0000000000..143d9c76e0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMariaDBIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MariaDBContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** This class is used to test the Generic dialect with MariaDB. */
+public class JdbcMariaDBIT extends AbstractJdbcIT {
+ private static final String MARIADB_CONTAINER_HOST = "mariadb-e2e";
+ private static final int MARIADB_PORT = 3306;
+ private static final String MARIADB_IMAGE =
+ "mariadb:11.6.2-ubi9"; // Use the appropriate version
+ private static final String MARIADB_DRIVER = "org.mariadb.jdbc.Driver";
+ private static final String MARIADB_URL = "jdbc:mariadb://" + HOST +
":%s/%s";
+ private static final String MARIADB_DATABASE_NAME = "seatunnel";
+ private static final String MARIADB_USER = "mariadb_user"; // Replace with
your username
+ private static final String MARIADB_PASSWORD = "mariadb_password"; //
Replace with your password
+
+ private static final String MARIADB_SOURCE = "source";
+ private static final String MARIADB_SINK = "sink";
+ private static final String CATALOG_DATABASE = "catalog_database";
+
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList(
+ "/jdbc_mariadb_source_and_sink.conf",
+ "/jdbc_mariadb_source_using_table_path.conf");
+
+ private static final String CREATE_SQL =
+ "CREATE TABLE IF NOT EXISTS %s\n"
+ + "(\n"
+ + " `c_int` INT
DEFAULT NULL,\n"
+ + " `c_varchar` varchar(255)
DEFAULT NULL,\n"
+ + " `c_text` text
DEFAULT NULL,\n"
+ + " `c_float` float
DEFAULT NULL,\n"
+ + " `c_double` double
DEFAULT NULL,\n"
+ + " `c_date` date
DEFAULT NULL,\n"
+ + " `c_datetime` datetime
DEFAULT NULL,\n"
+ + " `c_timestamp` timestamp
DEFAULT NULL\n"
+ + ");";
+
+ @Override
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl = String.format(MARIADB_URL, MARIADB_PORT,
MARIADB_DATABASE_NAME);
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+
+ String insertSql = insertTable(MARIADB_DATABASE_NAME, MARIADB_SOURCE,
fieldNames);
+
+ return JdbcCase.builder()
+ .dockerImage(MARIADB_IMAGE)
+ .networkAliases(MARIADB_CONTAINER_HOST)
+ .containerEnv(containerEnv)
+ .driverClass(MARIADB_DRIVER)
+ .host(HOST)
+ .port(MARIADB_PORT)
+ .localPort(MARIADB_PORT)
+ .jdbcTemplate(MARIADB_URL)
+ .jdbcUrl(jdbcUrl)
+ .userName(MARIADB_USER)
+ .password(MARIADB_PASSWORD)
+ .database(MARIADB_DATABASE_NAME)
+ .sourceTable(MARIADB_SOURCE)
+ .sinkTable(MARIADB_SINK)
+ .createSql(CREATE_SQL)
+ .configFile(CONFIG_FILE)
+ .insertSql(insertSql)
+ .testData(testDataSet)
+ .catalogDatabase(CATALOG_DATABASE)
+ .catalogTable(MARIADB_SINK)
+ .tablePathFullName(MARIADB_DATABASE_NAME + "." +
MARIADB_SOURCE)
+ .build();
+ }
+
+ @Override
+ protected void checkResult(
+ String executeKey, TestContainer container, Container.ExecResult
execResult) {
+ String[] fieldNames =
+ new String[] {
+ "c_int",
+ "c_varchar",
+ "c_text",
+ "c_float",
+ "c_double",
+ "c_date",
+ "c_datetime",
+ "c_timestamp"
+ };
+ defaultCompare(executeKey, fieldNames, "c_int");
+ }
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/org/mariadb/jdbc/mariadb-java-client/3.5.1/mariadb-java-client-3.5.1.jar";
// Use the appropriate version
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ String[] fieldNames =
+ new String[] {
+ "c_int",
+ "c_varchar",
+ "c_text",
+ "c_float",
+ "c_double",
+ "c_date",
+ "c_datetime",
+ "c_timestamp"
+ };
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ String varcharValue = String.format("varchar_value_%d", i);
+ String textValue = String.format("text_value_%d", i);
+ float floatValue = 1.1f;
+ double doubleValue = 1.1;
+ LocalDate localDate = LocalDate.now();
+ LocalDateTime localDateTime = LocalDateTime.now();
+
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ i, // int
+ varcharValue, // varchar
+ textValue, // text
+ floatValue, // float
+ doubleValue, // double
+ Date.valueOf(localDate), // date
+ Timestamp.valueOf(localDateTime), // datetime
+ new Timestamp(System.currentTimeMillis()) //
timestamp
+ });
+ rows.add(row);
+ }
+
+ return Pair.of(fieldNames, rows);
+ }
+
+ @Override
+ GenericContainer<?> initContainer() {
+ DockerImageName imageName = DockerImageName.parse(MARIADB_IMAGE);
+ GenericContainer<?> container =
+ new MariaDBContainer(imageName)
+ .withUsername(MARIADB_USER)
+ .withPassword(MARIADB_PASSWORD)
+ .withDatabaseName(MARIADB_DATABASE_NAME)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MARIADB_CONTAINER_HOST)
+ .withExposedPorts(MARIADB_PORT)
+ .waitingFor(Wait.forHealthcheck())
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(MARIADB_IMAGE)));
+ container.setPortBindings(
+ Lists.newArrayList(String.format("%d:%d", MARIADB_PORT,
MARIADB_PORT)));
+ return container;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_and_sink.conf
new file mode 100644
index 0000000000..c2ac75159c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_and_sink.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ url = "jdbc:mariadb://mariadb-e2e:3306/seatunnel"
+ driver = "org.mariadb.jdbc.Driver"
+ connection_check_timeout_sec = 100
+ user = "mariadb_user"
+ password = "mariadb_password"
+
+ query = "select * from source;"
+ properties {
+ useSSL=false
+ rewriteBatchedStatements=true
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mariadb://mariadb-e2e:3306/seatunnel"
+ driver = "org.mariadb.jdbc.Driver"
+ user = "mariadb_user"
+ password = "mariadb_password"
+ query = """insert into sink
(c_int,c_varchar,c_text,c_float,c_double,c_date,c_datetime,c_timestamp) values
(?, ?, ?, ?, ?, ?, ?, ?);"""
+ properties {
+ useSSL=false
+ rewriteBatchedStatements=true
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_using_table_path.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_using_table_path.conf
new file mode 100644
index 0000000000..a2fa37c39b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mariadb_source_using_table_path.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ url = "jdbc:mariadb://mariadb-e2e:3306/seatunnel?useSSL=false"
+ driver = "org.mariadb.jdbc.Driver"
+ connection_check_timeout_sec = 100
+ user = "mariadb_user"
+ password = "mariadb_password"
+
+ table_path = "seatunnel.source"
+ split.size = 8096
+ split.even-distribution.factor.upper-bound = 100
+ split.even-distribution.factor.lower-bound = 0.05
+ split.sample-sharding.threshold = 1000
+ split.inverse-sampling.rate = 1000
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:mariadb://mariadb-e2e:3306/seatunnel?useSSL=false"
+ driver = "org.mariadb.jdbc.Driver"
+ user = "mariadb_user"
+ password = "mariadb_password"
+
+ database = "seatunnel"
+ table = "sink"
+ generate_sink_sql = true
+ }
+}
\ No newline at end of file