This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 1de056a9a4 [Improve][Connector-V2] The interface supports jdbc respects the target database field type (#8031) 1de056a9a4 is described below commit 1de056a9a4b3fb96106ac6e8f290b21818cd775e Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Thu Dec 5 13:26:41 2024 +0800 [Improve][Connector-V2] The interface supports jdbc respects the target database field type (#8031) --- .../seatunnel/jdbc/catalog/dm/DamengCatalog.java | 97 +++++++----- .../jdbc/internal/JdbcOutputFormatBuilder.java | 72 +++++++-- .../converter/AbstractJdbcRowConverter.java | 172 ++++++++++++--------- .../jdbc/internal/converter/JdbcRowConverter.java | 13 ++ .../dialect/hive/HiveJdbcRowConverter.java | 7 +- .../dialect/inceptor/InceptorJdbcRowConverter.java | 7 +- .../dialect/kingbase/KingbaseJdbcRowConverter.java | 7 +- .../oceanbase/OceanBaseMysqlJdbcRowConverter.java | 7 +- .../dialect/oracle/OracleJdbcRowConverter.java | 30 ++++ .../dialect/psql/PostgresJdbcRowConverter.java | 7 +- .../executor/FieldNamedPreparedStatement.java | 4 +- .../InsertOrUpdateBatchStatementExecutor.java | 11 +- .../executor/SimpleBatchStatementExecutor.java | 5 +- .../jdbc/sink/AbstractJdbcSinkWriter.java | 7 +- .../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 5 +- .../connectors/seatunnel/jdbc/sink/JdbcSink.java | 85 ++++++++-- .../seatunnel/jdbc/sink/JdbcSinkWriter.java | 14 +- .../connectors/seatunnel/jdbc/JdbcOracleIT.java | 11 ++ .../test/resources/jdbc_oracle_source_to_sink.conf | 4 +- .../jdbc_oracle_source_to_sink_use_select1.conf | 4 +- .../jdbc_oracle_source_to_sink_use_select2.conf | 2 +- .../jdbc_oracle_source_to_sink_use_select3.conf | 2 +- 22 files changed, 416 insertions(+), 157 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java index 04b15dfe1c..ba237d3bfd 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java @@ -21,7 +21,6 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; @@ -34,7 +33,6 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTy import lombok.extern.slf4j.Slf4j; import java.sql.Connection; -import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -44,6 +42,23 @@ import java.util.List; @Slf4j public class DamengCatalog extends AbstractJdbcCatalog { + private static final String SELECT_COLUMNS_SQL = + "SELECT COLUMNS.COLUMN_NAME, COLUMNS.DATA_TYPE, COLUMNS.DATA_LENGTH, COLUMNS.DATA_PRECISION, COLUMNS.DATA_SCALE " + + ", COLUMNS.NULLABLE, COLUMNS.DATA_DEFAULT, COMMENTS.COMMENTS ," + + "CASE \n" + + " WHEN COLUMNS.DATA_TYPE IN ('CHAR', 'CHARACTER', 'VARCHAR', 'VARCHAR2', 'VARBINARY', 'BINARY') THEN COLUMNS.DATA_TYPE || '(' || COLUMNS.DATA_LENGTH || ')'\n" + + " WHEN COLUMNS.DATA_TYPE IN ('NUMERIC', 'DECIMAL', 'NUMBER') AND COLUMNS.DATA_PRECISION IS NOT NULL AND COLUMNS.DATA_SCALE IS NOT NULL AND COLUMNS.DATA_PRECISION != 0 AND COLUMNS.DATA_SCALE != 0 THEN COLUMNS.DATA_TYPE || '(' || COLUMNS.DATA_PRECISION || ', ' || COLUMNS.DATA_SCALE || ')'\n" + + " ELSE COLUMNS.DATA_TYPE\n" + + " END AS SOURCE_TYPE \n" + + "FROM ALL_TAB_COLUMNS COLUMNS " + + "LEFT JOIN ALL_COL_COMMENTS COMMENTS " + + "ON COLUMNS.OWNER = COMMENTS.SCHEMA_NAME " + + "AND COLUMNS.TABLE_NAME = COMMENTS.TABLE_NAME " + + "AND COLUMNS.COLUMN_NAME = COMMENTS.COLUMN_NAME " + + "WHERE COLUMNS.OWNER = '%s' " + + "AND COLUMNS.TABLE_NAME = '%s' " + + "ORDER BY COLUMNS.COLUMN_ID ASC"; + public DamengCatalog( String catalogName, String username, @@ -53,6 +68,23 @@ public class DamengCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo, defaultSchema); } + @Override + protected void createDatabaseInternal(String databaseName) { + throw new UnsupportedOperationException(); + } + + @Override + protected void dropDatabaseInternal(String databaseName) throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public String getExistDataSql(TablePath tablePath) { + return String.format( + "select * from \"%s\".\"%s\" LIMIT 1", + tablePath.getSchemaName(), tablePath.getTableName()); + } + @Override protected String getDatabaseWithConditionSql(String databaseName) { return String.format(getListDatabaseSql() + " where name = '%s'", databaseName); @@ -98,19 +130,34 @@ public class DamengCatalog extends AbstractJdbcCatalog { return rs.getString(1) + "." + rs.getString(2); } + @Override + protected String getSelectColumnsSql(TablePath tablePath) { + return String.format( + SELECT_COLUMNS_SQL, tablePath.getSchemaName(), tablePath.getTableName()); + } + + @Override + protected String getTruncateTableSql(TablePath tablePath) { + return String.format( + "TRUNCATE TABLE \"%s\".\"%s\"", + tablePath.getSchemaName(), tablePath.getTableName()); + } + @Override protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("COLUMN_NAME"); - String typeName = resultSet.getString("TYPE_NAME"); - Long columnLength = resultSet.getLong("COLUMN_SIZE"); - Long columnPrecision = columnLength; - Integer columnScale = resultSet.getObject("DECIMAL_DIGITS", Integer.class); - String columnComment = resultSet.getString("REMARKS"); - Object defaultValue = resultSet.getObject("COLUMN_DEF"); - boolean isNullable = (resultSet.getInt("NULLABLE") == DatabaseMetaData.columnNullable); + String typeName = resultSet.getString("DATA_TYPE"); + long columnLength = resultSet.getLong("DATA_LENGTH"); + long columnPrecision = resultSet.getLong("DATA_PRECISION"); + int columnScale = resultSet.getInt("DATA_SCALE"); + String columnComment = resultSet.getString("COMMENTS"); + Object defaultValue = resultSet.getObject("DATA_DEFAULT"); + boolean isNullable = resultSet.getString("NULLABLE").equals("Y"); + BasicTypeDefine typeDefine = BasicTypeDefine.builder() .name(columnName) + .columnType(typeName) .dataType(typeName) .length(columnLength) .precision(columnPrecision) @@ -132,6 +179,11 @@ public class DamengCatalog extends AbstractJdbcCatalog { return tablePath.getSchemaAndTableName(); } + private List<String> listTables() { + List<String> databases = listDatabases(); + return listTables(databases.get(0)); + } + @Override public List<String> listTables(String databaseName) throws CatalogException, DatabaseNotExistException { @@ -161,31 +213,4 @@ public class DamengCatalog extends AbstractJdbcCatalog { Connection defaultConnection = getConnection(defaultUrl); return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new DmdbTypeMapper()); } - - @Override - protected TableSchema.Builder buildColumnsReturnTablaSchemaBuilder( - TablePath tablePath, Connection conn) throws SQLException { - TableSchema.Builder columnsBuilder = TableSchema.builder(); - DatabaseMetaData metaData = conn.getMetaData(); - try (ResultSet resultSet = - metaData.getColumns( - null, tablePath.getSchemaName(), tablePath.getTableName(), null)) { - buildColumnsWithErrorCheck(tablePath, resultSet, columnsBuilder); - } - return columnsBuilder; - } - - @Override - protected String getTruncateTableSql(TablePath tablePath) { - return String.format( - "TRUNCATE TABLE \"%s\".\"%s\"", - tablePath.getSchemaName(), tablePath.getTableName()); - } - - @Override - protected String getExistDataSql(TablePath tablePath) { - return String.format( - "select * from \"%s\".\"%s\" WHERE rownum = 1", - tablePath.getSchemaName(), tablePath.getTableName()); - } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java index dee1b58e0e..7748823ca4 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java @@ -39,6 +39,8 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -53,6 +55,7 @@ public class JdbcOutputFormatBuilder { @NonNull private final JdbcConnectionProvider connectionProvider; @NonNull private final JdbcSinkConfig jdbcSinkConfig; @NonNull private final TableSchema tableSchema; + @Nullable private final TableSchema databaseTableSchema; public JdbcOutputFormat build() { JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory; @@ -76,10 +79,13 @@ public class JdbcOutputFormatBuilder { createSimpleBufferedExecutor( jdbcSinkConfig.getSimpleSql(), tableSchema, + databaseTableSchema, dialect.getRowConverter()); } else if (primaryKeys == null || primaryKeys.isEmpty()) { statementExecutorFactory = - () -> createSimpleBufferedExecutor(dialect, database, table, tableSchema); + () -> + createSimpleBufferedExecutor( + dialect, database, table, tableSchema, databaseTableSchema); } else { statementExecutorFactory = () -> @@ -88,6 +94,7 @@ public class JdbcOutputFormatBuilder { database, table, tableSchema, + databaseTableSchema, primaryKeys.toArray(new String[0]), jdbcSinkConfig.isEnableUpsert(), jdbcSinkConfig.isPrimaryKeyUpdated(), @@ -101,16 +108,24 @@ public class JdbcOutputFormatBuilder { } private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor( - JdbcDialect dialect, String database, String table, TableSchema tableSchema) { + JdbcDialect dialect, + String database, + String table, + TableSchema tableSchema, + TableSchema databaseTableSchema) { String insertSQL = dialect.getInsertIntoStatement(database, table, tableSchema.getFieldNames()); - return createSimpleBufferedExecutor(insertSQL, tableSchema, dialect.getRowConverter()); + return createSimpleBufferedExecutor( + insertSQL, tableSchema, databaseTableSchema, dialect.getRowConverter()); } private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleBufferedExecutor( - String sql, TableSchema tableSchema, JdbcRowConverter rowConverter) { + String sql, + TableSchema tableSchema, + TableSchema databaseTableSchema, + JdbcRowConverter rowConverter) { JdbcBatchStatementExecutor<SeaTunnelRow> simpleRowExecutor = - createSimpleExecutor(sql, tableSchema, rowConverter); + createSimpleExecutor(sql, tableSchema, databaseTableSchema, rowConverter); return new BufferedBatchStatementExecutor(simpleRowExecutor, Function.identity()); } @@ -119,6 +134,7 @@ public class JdbcOutputFormatBuilder { String database, String table, TableSchema tableSchema, + TableSchema databaseTableSchema, String[] pkNames, boolean enableUpsert, boolean isPrimaryKeyUpdated, @@ -139,13 +155,15 @@ public class JdbcOutputFormatBuilder { Function<SeaTunnelRow, SeaTunnelRow> keyExtractor = createKeyExtractor(pkFields); JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor = - createDeleteExecutor(dialect, database, table, pkNames, pkSchema); + createDeleteExecutor( + dialect, database, table, pkNames, pkSchema, databaseTableSchema); JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor = createUpsertExecutor( dialect, database, table, tableSchema, + databaseTableSchema, pkNames, pkSchema, keyExtractor, @@ -161,6 +179,7 @@ public class JdbcOutputFormatBuilder { String database, String table, TableSchema tableSchema, + TableSchema databaseTableSchema, String[] pkNames, TableSchema pkTableSchema, Function<SeaTunnelRow, SeaTunnelRow> keyExtractor, @@ -168,7 +187,8 @@ public class JdbcOutputFormatBuilder { boolean isPrimaryKeyUpdated, boolean supportUpsertByInsertOnly) { if (supportUpsertByInsertOnly) { - return createInsertOnlyExecutor(dialect, database, table, tableSchema); + return createInsertOnlyExecutor( + dialect, database, table, tableSchema, databaseTableSchema); } if (enableUpsert) { Optional<String> upsertSQL = @@ -176,20 +196,30 @@ public class JdbcOutputFormatBuilder { database, table, tableSchema.getFieldNames(), pkNames); if (upsertSQL.isPresent()) { return createSimpleExecutor( - upsertSQL.get(), tableSchema, dialect.getRowConverter()); + upsertSQL.get(), + tableSchema, + databaseTableSchema, + dialect.getRowConverter()); } return createInsertOrUpdateByQueryExecutor( dialect, database, table, tableSchema, + databaseTableSchema, pkNames, pkTableSchema, keyExtractor, isPrimaryKeyUpdated); } return createInsertOrUpdateExecutor( - dialect, database, table, tableSchema, pkNames, isPrimaryKeyUpdated); + dialect, + database, + table, + tableSchema, + databaseTableSchema, + pkNames, + isPrimaryKeyUpdated); } private static JdbcBatchStatementExecutor<SeaTunnelRow> createCopyInBufferStatementExecutor( @@ -209,8 +239,11 @@ public class JdbcOutputFormatBuilder { } private static JdbcBatchStatementExecutor<SeaTunnelRow> createInsertOnlyExecutor( - JdbcDialect dialect, String database, String table, TableSchema tableSchema) { - + JdbcDialect dialect, + String database, + String table, + TableSchema tableSchema, + TableSchema databaseTableSchema) { return new SimpleBatchStatementExecutor( connection -> FieldNamedPreparedStatement.prepareStatement( @@ -219,6 +252,7 @@ public class JdbcOutputFormatBuilder { database, table, tableSchema.getFieldNames()), tableSchema.getFieldNames()), tableSchema, + databaseTableSchema, dialect.getRowConverter()); } @@ -227,6 +261,7 @@ public class JdbcOutputFormatBuilder { String database, String table, TableSchema tableSchema, + TableSchema databaseTableSchema, String[] pkNames, boolean isPrimaryKeyUpdated) { @@ -248,6 +283,7 @@ public class JdbcOutputFormatBuilder { isPrimaryKeyUpdated), tableSchema.getFieldNames()), tableSchema, + databaseTableSchema, dialect.getRowConverter()); } @@ -256,6 +292,7 @@ public class JdbcOutputFormatBuilder { String database, String table, TableSchema tableSchema, + TableSchema databaseTableSchema, String[] pkNames, TableSchema pkTableSchema, Function<SeaTunnelRow, SeaTunnelRow> keyExtractor, @@ -285,6 +322,7 @@ public class JdbcOutputFormatBuilder { pkTableSchema, keyExtractor, tableSchema, + databaseTableSchema, dialect.getRowConverter()); } @@ -293,18 +331,24 @@ public class JdbcOutputFormatBuilder { String database, String table, String[] pkNames, - TableSchema pkTableSchema) { + TableSchema pkTableSchema, + TableSchema databaseTableSchema) { String deleteSQL = dialect.getDeleteStatement(database, table, pkNames); - return createSimpleExecutor(deleteSQL, pkTableSchema, dialect.getRowConverter()); + return createSimpleExecutor( + deleteSQL, pkTableSchema, databaseTableSchema, dialect.getRowConverter()); } private static JdbcBatchStatementExecutor<SeaTunnelRow> createSimpleExecutor( - String sql, TableSchema tableSchema, JdbcRowConverter rowConverter) { + String sql, + TableSchema tableSchema, + TableSchema databaseTableSchema, + JdbcRowConverter rowConverter) { return new SimpleBatchStatementExecutor( connection -> FieldNamedPreparedStatement.prepareStatement( connection, sql, tableSchema.getFieldNames()), tableSchema, + databaseTableSchema, rowConverter); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java index 691de6b77c..42bcf2d894 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java @@ -31,6 +31,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils; import lombok.extern.slf4j.Slf4j; +import javax.annotation.Nullable; + import java.math.BigDecimal; import java.sql.Array; import java.sql.Date; @@ -186,89 +188,37 @@ public abstract class AbstractJdbcRowConverter implements JdbcRowConverter { public PreparedStatement toExternal( TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) throws SQLException { + return toExternal(tableSchema, null, row, statement); + } + + @Override + public PreparedStatement toExternal( + TableSchema tableSchema, + @Nullable TableSchema databaseTableSchema, + SeaTunnelRow row, + PreparedStatement statement) + throws SQLException { SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { try { SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex); + String fieldName = rowType.getFieldName(fieldIndex); int statementIndex = fieldIndex + 1; Object fieldValue = row.getField(fieldIndex); if (fieldValue == null) { statement.setObject(statementIndex, null); continue; } - - switch (seaTunnelDataType.getSqlType()) { - case STRING: - statement.setString(statementIndex, (String) row.getField(fieldIndex)); - break; - case BOOLEAN: - statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex)); - break; - case TINYINT: - statement.setByte(statementIndex, (Byte) row.getField(fieldIndex)); - break; - case SMALLINT: - statement.setShort(statementIndex, (Short) row.getField(fieldIndex)); - break; - case INT: - statement.setInt(statementIndex, (Integer) row.getField(fieldIndex)); - break; - case BIGINT: - statement.setLong(statementIndex, (Long) row.getField(fieldIndex)); - break; - case FLOAT: - statement.setFloat(statementIndex, (Float) row.getField(fieldIndex)); - break; - case DOUBLE: - statement.setDouble(statementIndex, (Double) row.getField(fieldIndex)); - break; - case DECIMAL: - statement.setBigDecimal( - statementIndex, (BigDecimal) row.getField(fieldIndex)); - break; - case DATE: - LocalDate localDate = (LocalDate) row.getField(fieldIndex); - statement.setDate(statementIndex, java.sql.Date.valueOf(localDate)); - break; - case TIME: - writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex)); - break; - case TIMESTAMP: - LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex); - statement.setTimestamp( - statementIndex, java.sql.Timestamp.valueOf(localDateTime)); - break; - case BYTES: - statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex)); - break; - case NULL: - statement.setNull(statementIndex, java.sql.Types.NULL); - break; - case ARRAY: - SeaTunnelDataType elementType = - ((ArrayType) seaTunnelDataType).getElementType(); - Object[] array = (Object[]) row.getField(fieldIndex); - if (array == null) { - statement.setNull(statementIndex, java.sql.Types.ARRAY); - break; - } - if (SqlType.TINYINT.equals(elementType.getSqlType())) { - Short[] shortArray = new Short[array.length]; - for (int i = 0; i < array.length; i++) { - shortArray[i] = Short.valueOf(array[i].toString()); - } - statement.setObject(statementIndex, shortArray); - } else { - statement.setObject(statementIndex, array); - } - break; - case MAP: - case ROW: - default: - throw new JdbcConnectorException( - CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, - "Unexpected value: " + seaTunnelDataType); + String sourceType = null; + if (databaseTableSchema != null && databaseTableSchema.contains(fieldName)) { + sourceType = databaseTableSchema.getColumn(fieldName).getSourceType(); } + setValueToStatementByDataType( + row.getField(fieldIndex), + statement, + seaTunnelDataType, + statementIndex, + sourceType); } catch (Exception e) { throw new JdbcConnectorException( JdbcConnectorErrorCode.DATA_TYPE_CAST_FAILED, @@ -279,6 +229,84 @@ public abstract class AbstractJdbcRowConverter implements JdbcRowConverter { return statement; } + protected void setValueToStatementByDataType( + Object value, + PreparedStatement statement, + SeaTunnelDataType<?> seaTunnelDataType, + int statementIndex, + @Nullable String sourceType) + throws SQLException { + switch (seaTunnelDataType.getSqlType()) { + case STRING: + statement.setString(statementIndex, (String) value); + break; + case BOOLEAN: + statement.setBoolean(statementIndex, (Boolean) value); + break; + case TINYINT: + statement.setByte(statementIndex, (Byte) value); + break; + case SMALLINT: + statement.setShort(statementIndex, (Short) value); + break; + case INT: + statement.setInt(statementIndex, (Integer) value); + break; + case BIGINT: + statement.setLong(statementIndex, (Long) value); + break; + case FLOAT: + statement.setFloat(statementIndex, (Float) value); + break; + case DOUBLE: + statement.setDouble(statementIndex, (Double) value); + break; + case DECIMAL: + statement.setBigDecimal(statementIndex, (BigDecimal) value); + break; + case DATE: + LocalDate localDate = (LocalDate) value; + statement.setDate(statementIndex, Date.valueOf(localDate)); + break; + case TIME: + writeTime(statement, statementIndex, (LocalTime) value); + break; + case TIMESTAMP: + LocalDateTime localDateTime = (LocalDateTime) value; + statement.setTimestamp(statementIndex, Timestamp.valueOf(localDateTime)); + break; + case BYTES: + statement.setBytes(statementIndex, (byte[]) value); + break; + case NULL: + statement.setNull(statementIndex, java.sql.Types.NULL); + break; + case ARRAY: + SeaTunnelDataType elementType = ((ArrayType) seaTunnelDataType).getElementType(); + Object[] array = (Object[]) value; + if (array == null) { + statement.setNull(statementIndex, java.sql.Types.ARRAY); + break; + } + if (SqlType.TINYINT.equals(elementType.getSqlType())) { + Short[] shortArray = new Short[array.length]; + for (int i = 0; i < array.length; i++) { + shortArray[i] = Short.valueOf(array[i].toString()); + } + statement.setObject(statementIndex, shortArray); + } else { + statement.setObject(statementIndex, array); + } + break; + case MAP: + case ROW: + default: + throw new JdbcConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, + "Unexpected value: " + seaTunnelDataType); + } + } + protected void writeTime(PreparedStatement statement, int index, LocalTime time) throws SQLException { statement.setTime(index, java.sql.Time.valueOf(time)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java index a8c7c079d3..f3cec3996c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java @@ -20,6 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import javax.annotation.Nullable; + import java.io.Serializable; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -38,7 +40,18 @@ public interface JdbcRowConverter extends Serializable { */ SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQLException; + @Deprecated PreparedStatement toExternal( TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) throws SQLException; + + /** Convert data from internal {@link SeaTunnelRow} to JDBC object. */ + default PreparedStatement toExternal( + TableSchema tableSchema, + @Nullable TableSchema databaseTableSchema, + SeaTunnelRow row, + PreparedStatement statement) + throws SQLException { + return toExternal(tableSchema, row, statement); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java index 28f7fdf425..7433e66bb6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java @@ -24,6 +24,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorExc import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import javax.annotation.Nullable; + import java.sql.PreparedStatement; public class HiveJdbcRowConverter extends AbstractJdbcRowConverter { @@ -35,7 +37,10 @@ public class HiveJdbcRowConverter extends AbstractJdbcRowConverter { @Override public PreparedStatement toExternal( - TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) { + TableSchema tableSchema, + @Nullable TableSchema databaseTableSchema, + SeaTunnelRow row, + PreparedStatement statement) { throw new JdbcConnectorException( JdbcConnectorErrorCode.DONT_SUPPORT_SINK, "The Hive jdbc connector don't support sink"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java index 806788b30e..33c689fd39 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java @@ -31,6 +31,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive.Hive import org.apache.commons.lang3.StringUtils; +import javax.annotation.Nullable; + import java.math.BigDecimal; import java.sql.PreparedStatement; import java.time.LocalDate; @@ -46,7 +48,10 @@ public class InceptorJdbcRowConverter extends HiveJdbcRowConverter { @Override public PreparedStatement toExternal( - TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) { + TableSchema tableSchema, + @Nullable TableSchema databaseTableSchema, + SeaTunnelRow row, + PreparedStatement statement) { SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { try { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java index 4a9411b99b..f766ce3980 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java @@ -27,6 +27,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.Abstrac import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils; +import javax.annotation.Nullable; + import java.math.BigDecimal; import java.sql.Date; import java.sql.PreparedStatement; @@ -119,7 +121,10 @@ public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter { @Override public PreparedStatement toExternal( - TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) + TableSchema tableSchema, + @Nullable TableSchema databaseTableSchema, + SeaTunnelRow row, + PreparedStatement statement) throws SQLException { SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java index 0a52e6a90b..5bf23ea1fe 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java @@ -34,6 +34,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils; import org.apache.commons.lang3.StringUtils; +import javax.annotation.Nullable; + import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.Date; @@ -145,7 +147,10 @@ public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter { @Override public PreparedStatement toExternal( - TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) + TableSchema tableSchema, + @Nullable TableSchema databaseTableSchema, + SeaTunnelRow row, + PreparedStatement statement) throws SQLException { SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java index 6e32cca436..6c74387f47 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java @@ -17,13 +17,43 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SqlType; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter.ORACLE_BLOB; + public class OracleJdbcRowConverter extends AbstractJdbcRowConverter { @Override public String converterName() { return DatabaseIdentifier.ORACLE; } + + @Override + protected void setValueToStatementByDataType( + Object value, + PreparedStatement statement, + SeaTunnelDataType<?> seaTunnelDataType, + int statementIndex, + @Nullable String sourceType) + throws SQLException { + if (seaTunnelDataType.getSqlType().equals(SqlType.BYTES)) { + if (ORACLE_BLOB.equals(sourceType)) { + statement.setBinaryStream(statementIndex, new ByteArrayInputStream((byte[]) value)); + } else { + statement.setBytes(statementIndex, (byte[]) value); + } + } else { + super.setValueToStatementByDataType( + value, statement, seaTunnelDataType, statementIndex, sourceType); + } + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java index 071e8ec6e1..7fbb2f7782 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java @@ -33,6 +33,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils; import org.postgresql.util.PGobject; +import javax.annotation.Nullable; + import java.math.BigDecimal; import java.sql.Array; import java.sql.Date; @@ -158,7 +160,10 @@ public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter { @Override public PreparedStatement toExternal( - TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement) + TableSchema tableSchema, + @Nullable TableSchema databaseTableSchema, + SeaTunnelRow row, + PreparedStatement statement) throws SQLException { SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); String[] sourceTypes = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java index c98f50ba92..88e658fc38 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java @@ -336,7 +336,9 @@ public class FieldNamedPreparedStatement implements PreparedStatement { @Override public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { - throw new UnsupportedOperationException(); + for (int index : indexMapping[parameterIndex - 1]) { + statement.setBinaryStream(index, x); + } } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java index 9cf8b95863..ae08d027f7 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java @@ -26,6 +26,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow import lombok.NonNull; import lombok.RequiredArgsConstructor; +import javax.annotation.Nullable; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -42,6 +44,7 @@ public class InsertOrUpdateBatchStatementExecutor private final TableSchema keyTableSchema; private final Function<SeaTunnelRow, SeaTunnelRow> keyExtractor; @NonNull private final TableSchema valueTableSchema; + @Nullable private final TableSchema databaseTableSchema; @NonNull private final JdbcRowConverter rowConverter; private transient PreparedStatement existStatement; private transient PreparedStatement insertStatement; @@ -53,6 +56,7 @@ public class InsertOrUpdateBatchStatementExecutor StatementFactory insertStmtFactory, StatementFactory updateStmtFactory, TableSchema valueTableSchema, + TableSchema databaseTableSchema, JdbcRowConverter rowConverter) { this( null, @@ -61,6 +65,7 @@ public class InsertOrUpdateBatchStatementExecutor null, null, valueTableSchema, + databaseTableSchema, rowConverter); } @@ -81,14 +86,14 @@ public class InsertOrUpdateBatchStatementExecutor insertStatement.executeBatch(); insertStatement.clearBatch(); } - rowConverter.toExternal(valueTableSchema, record, updateStatement); + rowConverter.toExternal(valueTableSchema, databaseTableSchema, record, updateStatement); updateStatement.addBatch(); } else { if (preExistFlag != null && preExistFlag) { updateStatement.executeBatch(); updateStatement.clearBatch(); } - rowConverter.toExternal(valueTableSchema, record, insertStatement); + rowConverter.toExternal(valueTableSchema, databaseTableSchema, record, insertStatement); insertStatement.addBatch(); } @@ -147,7 +152,7 @@ public class InsertOrUpdateBatchStatementExecutor } private boolean exist(SeaTunnelRow pk) throws SQLException { - rowConverter.toExternal(keyTableSchema, pk, existStatement); + rowConverter.toExternal(keyTableSchema, databaseTableSchema, pk, existStatement); try (ResultSet resultSet = existStatement.executeQuery()) { return resultSet.next(); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java index a2f0add260..c7e6f8e2ce 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java @@ -24,6 +24,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow import lombok.NonNull; import lombok.RequiredArgsConstructor; +import javax.annotation.Nullable; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -32,6 +34,7 @@ import java.sql.SQLException; public class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor<SeaTunnelRow> { @NonNull private final StatementFactory statementFactory; @NonNull private final TableSchema tableSchema; + @Nullable private final TableSchema databaseTableSchema; @NonNull private final JdbcRowConverter converter; private transient PreparedStatement statement; @@ -42,7 +45,7 @@ public class SimpleBatchStatementExecutor implements JdbcBatchStatementExecutor< @Override public void addToBatch(SeaTunnelRow record) throws SQLException { - converter.toExternal(tableSchema, record, statement); + converter.toExternal(tableSchema, databaseTableSchema, record, statement); statement.addBatch(); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java index 7ab289edc1..521b65ed94 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java @@ -50,6 +50,7 @@ public abstract class AbstractJdbcSinkWriter<ResourceT> protected JdbcDialect dialect; protected TablePath sinkTablePath; protected TableSchema tableSchema; + protected TableSchema databaseTableSchema; protected transient boolean isOpen; protected JdbcConnectionProvider connectionProvider; protected JdbcSinkConfig jdbcSinkConfig; @@ -76,7 +77,11 @@ public abstract class AbstractJdbcSinkWriter<ResourceT> } this.outputFormat = new JdbcOutputFormatBuilder( - dialect, connectionProvider, jdbcSinkConfig, tableSchema) + dialect, + connectionProvider, + jdbcSinkConfig, + tableSchema, + databaseTableSchema) .build(); this.outputFormat.open(); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java index d14cf59211..874d04135f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java @@ -77,6 +77,7 @@ public class JdbcExactlyOnceSinkWriter extends AbstractJdbcSinkWriter<Void> { JdbcDialect dialect, JdbcSinkConfig jdbcSinkConfig, TableSchema tableSchema, + TableSchema databaseTableSchema, List<JdbcSinkState> states) { checkArgument( jdbcSinkConfig.getJdbcConnectionConfig().getMaxRetries() == 0, @@ -95,7 +96,9 @@ public class JdbcExactlyOnceSinkWriter extends AbstractJdbcSinkWriter<Void> { XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig()); this.xaFacade = (XaFacade) this.connectionProvider; this.outputFormat = - new JdbcOutputFormatBuilder(dialect, xaFacade, jdbcSinkConfig, tableSchema).build(); + new JdbcOutputFormatBuilder( + dialect, xaFacade, jdbcSinkConfig, tableSchema, databaseTableSchema) + .build(); this.xaGroupOps = new XaGroupOpsImpl(xaFacade); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 2213a90808..25fc39005d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -34,6 +34,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.schema.SchemaChangeType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris.IrisCatalog; @@ -52,6 +53,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils; import org.apache.commons.lang3.StringUtils; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -60,6 +63,7 @@ import java.util.Optional; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +@Slf4j public class JdbcSink implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, JdbcAggregatedCommitInfo>, SupportSaveMode, @@ -105,6 +109,11 @@ public class JdbcSink @Override public AbstractJdbcSinkWriter createWriter(SinkWriter.Context context) { + try { + Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } TablePath sinkTablePath = catalogTable.getTablePath(); AbstractJdbcSinkWriter sinkWriter; if (jdbcSinkConfig.isExactlyOnce()) { @@ -116,18 +125,30 @@ public class JdbcSink dialect, jdbcSinkConfig, tableSchema, + getDatabaseTableSchema().orElse(null), new ArrayList<>()); } else { - if (catalogTable != null && catalogTable.getTableSchema().getPrimaryKey() != null) { + if (catalogTable.getTableSchema().getPrimaryKey() != null) { String keyName = tableSchema.getPrimaryKey().getColumnNames().get(0); int index = tableSchema.toPhysicalRowDataType().indexOf(keyName); if (index > -1) { return new JdbcSinkWriter( - sinkTablePath, dialect, jdbcSinkConfig, tableSchema, index); + sinkTablePath, + dialect, + jdbcSinkConfig, + tableSchema, + getDatabaseTableSchema().orElse(null), + index); } } sinkWriter = - new JdbcSinkWriter(sinkTablePath, dialect, jdbcSinkConfig, tableSchema, null); + new JdbcSinkWriter( + sinkTablePath, + dialect, + jdbcSinkConfig, + tableSchema, + getDatabaseTableSchema().orElse(null), + null); } return sinkWriter; } @@ -135,6 +156,11 @@ public class JdbcSink @Override public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> restoreWriter( SinkWriter.Context context, List<JdbcSinkState> states) throws IOException { + try { + Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } TablePath sinkTablePath = catalogTable.getTablePath(); if (jdbcSinkConfig.isExactlyOnce()) { return new JdbcExactlyOnceSinkWriter( @@ -144,11 +170,37 @@ public class JdbcSink dialect, jdbcSinkConfig, tableSchema, + getDatabaseTableSchema().orElse(null), states); } return SeaTunnelSink.super.restoreWriter(context, states); } + private Optional<TableSchema> getDatabaseTableSchema() { + Optional<Catalog> catalogOptional = getCatalog(); + FieldIdeEnum fieldIdeEnumEnum = config.get(JdbcOptions.FIELD_IDE); + String fieldIde = + fieldIdeEnumEnum == null + ? FieldIdeEnum.ORIGINAL.getValue() + : fieldIdeEnumEnum.getValue(); + TablePath tablePath = + TablePath.of( + catalogTable.getTableId().getDatabaseName(), + catalogTable.getTableId().getSchemaName(), + CatalogUtils.quoteTableIdentifier( + catalogTable.getTableId().getTableName(), fieldIde)); + if (catalogOptional.isPresent()) { + try (Catalog catalog = catalogOptional.get()) { + catalog.open(); + return Optional.of(catalog.getTable(tablePath).getTableSchema()); + } catch (TableNotExistException e) { + log.warn("table {} not exist when get the database catalog table", tablePath); + return Optional.empty(); + } + } + return Optional.empty(); + } + @Override public Optional<SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo>> createAggregatedCommitter() { @@ -187,18 +239,7 @@ public class JdbcSink throw new RuntimeException(e); } if (catalogTable != null) { - if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) { - return Optional.empty(); - } - if (StringUtils.isBlank(jdbcSinkConfig.getTable())) { - return Optional.empty(); - } - // use query to write data can not support savemode - if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) { - return Optional.empty(); - } - Optional<Catalog> catalogOptional = - JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect); + Optional<Catalog> catalogOptional = getCatalog(); if (catalogOptional.isPresent()) { try { Catalog catalog = catalogOptional.get(); @@ -242,6 +283,20 @@ public class JdbcSink return Optional.empty(); } + private Optional<Catalog> getCatalog() { + if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) { + return Optional.empty(); + } + if (StringUtils.isBlank(jdbcSinkConfig.getTable())) { + return Optional.empty(); + } + // use query to write data can not support get catalog + if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) { + return Optional.empty(); + } + return JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect); + } + @Override public Optional<CatalogTable> getWriteCatalogTable() { return Optional.ofNullable(catalogTable); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java index 3f43b2088d..d3c9949dc3 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java @@ -49,17 +49,23 @@ public class JdbcSinkWriter extends AbstractJdbcSinkWriter<ConnectionPoolManager JdbcDialect dialect, JdbcSinkConfig jdbcSinkConfig, TableSchema tableSchema, + TableSchema databaseTableSchema, Integer primaryKeyIndex) { this.sinkTablePath = sinkTablePath; this.dialect = dialect; this.tableSchema = tableSchema; + this.databaseTableSchema = databaseTableSchema; this.jdbcSinkConfig = jdbcSinkConfig; this.primaryKeyIndex = primaryKeyIndex; this.connectionProvider = dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig()); this.outputFormat = new JdbcOutputFormatBuilder( - dialect, connectionProvider, jdbcSinkConfig, tableSchema) + dialect, + connectionProvider, + jdbcSinkConfig, + tableSchema, + databaseTableSchema) .build(); } @@ -97,7 +103,11 @@ public class JdbcSinkWriter extends AbstractJdbcSinkWriter<ConnectionPoolManager queueIndex); this.outputFormat = new JdbcOutputFormatBuilder( - dialect, connectionProvider, jdbcSinkConfig, tableSchema) + dialect, + connectionProvider, + jdbcSinkConfig, + tableSchema, + databaseTableSchema) .build(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java index b0195837cb..7b21e27364 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java @@ -44,6 +44,7 @@ import org.testcontainers.utility.DockerLoggerFactory; import org.testcontainers.utility.MountableFile; import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; import java.sql.Date; import java.sql.Statement; import java.sql.Timestamp; @@ -53,6 +54,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class JdbcOracleIT extends AbstractJdbcIT { @@ -81,6 +84,7 @@ public class JdbcOracleIT extends AbstractJdbcIT { + " VARCHAR_10_COL varchar2(10),\n" + " CHAR_10_COL char(10),\n" + " CLOB_COL clob,\n" + + " BLOB_COL blob,\n" + " NUMBER_1 number(1),\n" + " NUMBER_6 number(6),\n" + " NUMBER_10 number(10),\n" @@ -104,6 +108,7 @@ public class JdbcOracleIT extends AbstractJdbcIT { + " VARCHAR_10_COL varchar2(10),\n" + " CHAR_10_COL char(10),\n" + " CLOB_COL clob,\n" + + " BLOB_COL blob,\n" + " NUMBER_1 number(1),\n" + " NUMBER_6 number(6),\n" + " NUMBER_10 number(10),\n" @@ -125,6 +130,7 @@ public class JdbcOracleIT extends AbstractJdbcIT { "VARCHAR_10_COL", "CHAR_10_COL", "CLOB_COL", + "BLOB_COL", "NUMBER_1", "NUMBER_6", "NUMBER_10", @@ -230,6 +236,11 @@ public class JdbcOracleIT extends AbstractJdbcIT { String.format("f%s", i), String.format("f%s", i), String.format("f%s", i), + // set value bytes more than 4000bytes + IntStream.range(0, 4000) + .mapToObj(d -> d + "") + .collect(Collectors.joining(",")) + .getBytes(StandardCharsets.UTF_8), 1, i * 10, i * 1000, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf index 4df8c7b993..d2e06158b8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf @@ -30,7 +30,7 @@ source { url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER" user = testUser password = testPassword - query = "SELECT VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL FROM E2E_TABLE_SOURCE" + query = "SELECT VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL FROM E2E_TABLE_SOURCE" properties { database.oracle.jdbc.timezoneAsRegion = "false" } @@ -46,7 +46,7 @@ sink { url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER" user = testUser password = testPassword - query = "INSERT INTO E2E_TABLE_SINK (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + query = "INSERT INTO E2E_TABLE_SINK (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" properties { database.oracle.jdbc.timezoneAsRegion = "false" } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf index 1988b48872..33cf33638f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf @@ -31,7 +31,7 @@ source { user = testUser password = testPassword use_select_count = true - query = "SELECT VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL FROM E2E_TABLE_SOURCE" + query = "SELECT VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL FROM E2E_TABLE_SOURCE" properties { database.oracle.jdbc.timezoneAsRegion = "false" } @@ -47,7 +47,7 @@ sink { url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER" user = testUser password = testPassword - query = "INSERT INTO E2E_TABLE_SINK (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + query = "INSERT INTO E2E_TABLE_SINK (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" properties { database.oracle.jdbc.timezoneAsRegion = "false" } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf index 4d01da5c72..e9e997ea90 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf @@ -47,7 +47,7 @@ sink { url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER" user = testUser password = testPassword - query = "INSERT INTO E2E_TABLE_SINK (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + query = "INSERT INTO E2E_TABLE_SINK (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" properties { database.oracle.jdbc.timezoneAsRegion = "false" } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf index 94a850fdd0..2be0b51224 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf @@ -48,7 +48,7 @@ sink { url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER" user = testUser password = testPassword - query = "INSERT INTO E2E_TABLE_SINK (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + query = "INSERT INTO E2E_TABLE_SINK (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" properties { database.oracle.jdbc.timezoneAsRegion = "false" }