This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1de056a9a4 [Improve][Connector-V2] The interface supports jdbc 
respects the target database field type (#8031)
1de056a9a4 is described below

commit 1de056a9a4b3fb96106ac6e8f290b21818cd775e
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Thu Dec 5 13:26:41 2024 +0800

    [Improve][Connector-V2] The interface supports jdbc respects the target 
database field type (#8031)
---
 .../seatunnel/jdbc/catalog/dm/DamengCatalog.java   |  97 +++++++-----
 .../jdbc/internal/JdbcOutputFormatBuilder.java     |  72 +++++++--
 .../converter/AbstractJdbcRowConverter.java        | 172 ++++++++++++---------
 .../jdbc/internal/converter/JdbcRowConverter.java  |  13 ++
 .../dialect/hive/HiveJdbcRowConverter.java         |   7 +-
 .../dialect/inceptor/InceptorJdbcRowConverter.java |   7 +-
 .../dialect/kingbase/KingbaseJdbcRowConverter.java |   7 +-
 .../oceanbase/OceanBaseMysqlJdbcRowConverter.java  |   7 +-
 .../dialect/oracle/OracleJdbcRowConverter.java     |  30 ++++
 .../dialect/psql/PostgresJdbcRowConverter.java     |   7 +-
 .../executor/FieldNamedPreparedStatement.java      |   4 +-
 .../InsertOrUpdateBatchStatementExecutor.java      |  11 +-
 .../executor/SimpleBatchStatementExecutor.java     |   5 +-
 .../jdbc/sink/AbstractJdbcSinkWriter.java          |   7 +-
 .../jdbc/sink/JdbcExactlyOnceSinkWriter.java       |   5 +-
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |  85 ++++++++--
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |  14 +-
 .../connectors/seatunnel/jdbc/JdbcOracleIT.java    |  11 ++
 .../test/resources/jdbc_oracle_source_to_sink.conf |   4 +-
 .../jdbc_oracle_source_to_sink_use_select1.conf    |   4 +-
 .../jdbc_oracle_source_to_sink_use_select2.conf    |   2 +-
 .../jdbc_oracle_source_to_sink_use_select3.conf    |   2 +-
 22 files changed, 416 insertions(+), 157 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index 04b15dfe1c..ba237d3bfd 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -21,7 +21,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
@@ -34,7 +33,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTy
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
-import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -44,6 +42,23 @@ import java.util.List;
 @Slf4j
 public class DamengCatalog extends AbstractJdbcCatalog {
 
+    private static final String SELECT_COLUMNS_SQL =
+            "SELECT COLUMNS.COLUMN_NAME, COLUMNS.DATA_TYPE, 
COLUMNS.DATA_LENGTH, COLUMNS.DATA_PRECISION, COLUMNS.DATA_SCALE "
+                    + ", COLUMNS.NULLABLE, COLUMNS.DATA_DEFAULT, 
COMMENTS.COMMENTS ,"
+                    + "CASE \n"
+                    + "        WHEN COLUMNS.DATA_TYPE IN ('CHAR', 'CHARACTER', 
'VARCHAR', 'VARCHAR2', 'VARBINARY', 'BINARY') THEN COLUMNS.DATA_TYPE || '(' || 
COLUMNS.DATA_LENGTH || ')'\n"
+                    + "        WHEN COLUMNS.DATA_TYPE IN ('NUMERIC', 
'DECIMAL', 'NUMBER') AND COLUMNS.DATA_PRECISION IS NOT NULL AND 
COLUMNS.DATA_SCALE IS NOT NULL AND COLUMNS.DATA_PRECISION != 0 AND 
COLUMNS.DATA_SCALE != 0 THEN COLUMNS.DATA_TYPE || '(' || COLUMNS.DATA_PRECISION 
|| ', ' || COLUMNS.DATA_SCALE || ')'\n"
+                    + "        ELSE COLUMNS.DATA_TYPE\n"
+                    + "    END AS SOURCE_TYPE \n"
+                    + "FROM ALL_TAB_COLUMNS COLUMNS "
+                    + "LEFT JOIN ALL_COL_COMMENTS COMMENTS "
+                    + "ON COLUMNS.OWNER = COMMENTS.SCHEMA_NAME "
+                    + "AND COLUMNS.TABLE_NAME = COMMENTS.TABLE_NAME "
+                    + "AND COLUMNS.COLUMN_NAME = COMMENTS.COLUMN_NAME "
+                    + "WHERE COLUMNS.OWNER = '%s' "
+                    + "AND COLUMNS.TABLE_NAME = '%s' "
+                    + "ORDER BY COLUMNS.COLUMN_ID ASC";
+
     public DamengCatalog(
             String catalogName,
             String username,
@@ -53,6 +68,23 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         super(catalogName, username, pwd, urlInfo, defaultSchema);
     }
 
+    @Override
+    protected void createDatabaseInternal(String databaseName) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void dropDatabaseInternal(String databaseName) throws 
CatalogException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getExistDataSql(TablePath tablePath) {
+        return String.format(
+                "select * from \"%s\".\"%s\" LIMIT 1",
+                tablePath.getSchemaName(), tablePath.getTableName());
+    }
+
     @Override
     protected String getDatabaseWithConditionSql(String databaseName) {
         return String.format(getListDatabaseSql() + " where name = '%s'", 
databaseName);
@@ -98,19 +130,34 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         return rs.getString(1) + "." + rs.getString(2);
     }
 
+    @Override
+    protected String getSelectColumnsSql(TablePath tablePath) {
+        return String.format(
+                SELECT_COLUMNS_SQL, tablePath.getSchemaName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    protected String getTruncateTableSql(TablePath tablePath) {
+        return String.format(
+                "TRUNCATE TABLE \"%s\".\"%s\"",
+                tablePath.getSchemaName(), tablePath.getTableName());
+    }
+
     @Override
     protected Column buildColumn(ResultSet resultSet) throws SQLException {
         String columnName = resultSet.getString("COLUMN_NAME");
-        String typeName = resultSet.getString("TYPE_NAME");
-        Long columnLength = resultSet.getLong("COLUMN_SIZE");
-        Long columnPrecision = columnLength;
-        Integer columnScale = resultSet.getObject("DECIMAL_DIGITS", 
Integer.class);
-        String columnComment = resultSet.getString("REMARKS");
-        Object defaultValue = resultSet.getObject("COLUMN_DEF");
-        boolean isNullable = (resultSet.getInt("NULLABLE") == 
DatabaseMetaData.columnNullable);
+        String typeName = resultSet.getString("DATA_TYPE");
+        long columnLength = resultSet.getLong("DATA_LENGTH");
+        long columnPrecision = resultSet.getLong("DATA_PRECISION");
+        int columnScale = resultSet.getInt("DATA_SCALE");
+        String columnComment = resultSet.getString("COMMENTS");
+        Object defaultValue = resultSet.getObject("DATA_DEFAULT");
+        boolean isNullable = resultSet.getString("NULLABLE").equals("Y");
+
         BasicTypeDefine typeDefine =
                 BasicTypeDefine.builder()
                         .name(columnName)
+                        .columnType(typeName)
                         .dataType(typeName)
                         .length(columnLength)
                         .precision(columnPrecision)
@@ -132,6 +179,11 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         return tablePath.getSchemaAndTableName();
     }
 
+    private List<String> listTables() {
+        List<String> databases = listDatabases();
+        return listTables(databases.get(0));
+    }
+
     @Override
     public List<String> listTables(String databaseName)
             throws CatalogException, DatabaseNotExistException {
@@ -161,31 +213,4 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         Connection defaultConnection = getConnection(defaultUrl);
         return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new 
DmdbTypeMapper());
     }
-
-    @Override
-    protected TableSchema.Builder buildColumnsReturnTablaSchemaBuilder(
-            TablePath tablePath, Connection conn) throws SQLException {
-        TableSchema.Builder columnsBuilder = TableSchema.builder();
-        DatabaseMetaData metaData = conn.getMetaData();
-        try (ResultSet resultSet =
-                metaData.getColumns(
-                        null, tablePath.getSchemaName(), 
tablePath.getTableName(), null)) {
-            buildColumnsWithErrorCheck(tablePath, resultSet, columnsBuilder);
-        }
-        return columnsBuilder;
-    }
-
-    @Override
-    protected String getTruncateTableSql(TablePath tablePath) {
-        return String.format(
-                "TRUNCATE TABLE \"%s\".\"%s\"",
-                tablePath.getSchemaName(), tablePath.getTableName());
-    }
-
-    @Override
-    protected String getExistDataSql(TablePath tablePath) {
-        return String.format(
-                "select * from \"%s\".\"%s\" WHERE rownum = 1",
-                tablePath.getSchemaName(), tablePath.getTableName());
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index dee1b58e0e..7748823ca4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -39,6 +39,8 @@ import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -53,6 +55,7 @@ public class JdbcOutputFormatBuilder {
     @NonNull private final JdbcConnectionProvider connectionProvider;
     @NonNull private final JdbcSinkConfig jdbcSinkConfig;
     @NonNull private final TableSchema tableSchema;
+    @Nullable private final TableSchema databaseTableSchema;
 
     public JdbcOutputFormat build() {
         JdbcOutputFormat.StatementExecutorFactory statementExecutorFactory;
@@ -76,10 +79,13 @@ public class JdbcOutputFormatBuilder {
                             createSimpleBufferedExecutor(
                                     jdbcSinkConfig.getSimpleSql(),
                                     tableSchema,
+                                    databaseTableSchema,
                                     dialect.getRowConverter());
         } else if (primaryKeys == null || primaryKeys.isEmpty()) {
             statementExecutorFactory =
-                    () -> createSimpleBufferedExecutor(dialect, database, 
table, tableSchema);
+                    () ->
+                            createSimpleBufferedExecutor(
+                                    dialect, database, table, tableSchema, 
databaseTableSchema);
         } else {
             statementExecutorFactory =
                     () ->
@@ -88,6 +94,7 @@ public class JdbcOutputFormatBuilder {
                                     database,
                                     table,
                                     tableSchema,
+                                    databaseTableSchema,
                                     primaryKeys.toArray(new String[0]),
                                     jdbcSinkConfig.isEnableUpsert(),
                                     jdbcSinkConfig.isPrimaryKeyUpdated(),
@@ -101,16 +108,24 @@ public class JdbcOutputFormatBuilder {
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createSimpleBufferedExecutor(
-            JdbcDialect dialect, String database, String table, TableSchema 
tableSchema) {
+            JdbcDialect dialect,
+            String database,
+            String table,
+            TableSchema tableSchema,
+            TableSchema databaseTableSchema) {
         String insertSQL =
                 dialect.getInsertIntoStatement(database, table, 
tableSchema.getFieldNames());
-        return createSimpleBufferedExecutor(insertSQL, tableSchema, 
dialect.getRowConverter());
+        return createSimpleBufferedExecutor(
+                insertSQL, tableSchema, databaseTableSchema, 
dialect.getRowConverter());
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createSimpleBufferedExecutor(
-            String sql, TableSchema tableSchema, JdbcRowConverter 
rowConverter) {
+            String sql,
+            TableSchema tableSchema,
+            TableSchema databaseTableSchema,
+            JdbcRowConverter rowConverter) {
         JdbcBatchStatementExecutor<SeaTunnelRow> simpleRowExecutor =
-                createSimpleExecutor(sql, tableSchema, rowConverter);
+                createSimpleExecutor(sql, tableSchema, databaseTableSchema, 
rowConverter);
         return new BufferedBatchStatementExecutor(simpleRowExecutor, 
Function.identity());
     }
 
@@ -119,6 +134,7 @@ public class JdbcOutputFormatBuilder {
             String database,
             String table,
             TableSchema tableSchema,
+            TableSchema databaseTableSchema,
             String[] pkNames,
             boolean enableUpsert,
             boolean isPrimaryKeyUpdated,
@@ -139,13 +155,15 @@ public class JdbcOutputFormatBuilder {
 
         Function<SeaTunnelRow, SeaTunnelRow> keyExtractor = 
createKeyExtractor(pkFields);
         JdbcBatchStatementExecutor<SeaTunnelRow> deleteExecutor =
-                createDeleteExecutor(dialect, database, table, pkNames, 
pkSchema);
+                createDeleteExecutor(
+                        dialect, database, table, pkNames, pkSchema, 
databaseTableSchema);
         JdbcBatchStatementExecutor<SeaTunnelRow> upsertExecutor =
                 createUpsertExecutor(
                         dialect,
                         database,
                         table,
                         tableSchema,
+                        databaseTableSchema,
                         pkNames,
                         pkSchema,
                         keyExtractor,
@@ -161,6 +179,7 @@ public class JdbcOutputFormatBuilder {
             String database,
             String table,
             TableSchema tableSchema,
+            TableSchema databaseTableSchema,
             String[] pkNames,
             TableSchema pkTableSchema,
             Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
@@ -168,7 +187,8 @@ public class JdbcOutputFormatBuilder {
             boolean isPrimaryKeyUpdated,
             boolean supportUpsertByInsertOnly) {
         if (supportUpsertByInsertOnly) {
-            return createInsertOnlyExecutor(dialect, database, table, 
tableSchema);
+            return createInsertOnlyExecutor(
+                    dialect, database, table, tableSchema, 
databaseTableSchema);
         }
         if (enableUpsert) {
             Optional<String> upsertSQL =
@@ -176,20 +196,30 @@ public class JdbcOutputFormatBuilder {
                             database, table, tableSchema.getFieldNames(), 
pkNames);
             if (upsertSQL.isPresent()) {
                 return createSimpleExecutor(
-                        upsertSQL.get(), tableSchema, 
dialect.getRowConverter());
+                        upsertSQL.get(),
+                        tableSchema,
+                        databaseTableSchema,
+                        dialect.getRowConverter());
             }
             return createInsertOrUpdateByQueryExecutor(
                     dialect,
                     database,
                     table,
                     tableSchema,
+                    databaseTableSchema,
                     pkNames,
                     pkTableSchema,
                     keyExtractor,
                     isPrimaryKeyUpdated);
         }
         return createInsertOrUpdateExecutor(
-                dialect, database, table, tableSchema, pkNames, 
isPrimaryKeyUpdated);
+                dialect,
+                database,
+                table,
+                tableSchema,
+                databaseTableSchema,
+                pkNames,
+                isPrimaryKeyUpdated);
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createCopyInBufferStatementExecutor(
@@ -209,8 +239,11 @@ public class JdbcOutputFormatBuilder {
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createInsertOnlyExecutor(
-            JdbcDialect dialect, String database, String table, TableSchema 
tableSchema) {
-
+            JdbcDialect dialect,
+            String database,
+            String table,
+            TableSchema tableSchema,
+            TableSchema databaseTableSchema) {
         return new SimpleBatchStatementExecutor(
                 connection ->
                         FieldNamedPreparedStatement.prepareStatement(
@@ -219,6 +252,7 @@ public class JdbcOutputFormatBuilder {
                                         database, table, 
tableSchema.getFieldNames()),
                                 tableSchema.getFieldNames()),
                 tableSchema,
+                databaseTableSchema,
                 dialect.getRowConverter());
     }
 
@@ -227,6 +261,7 @@ public class JdbcOutputFormatBuilder {
             String database,
             String table,
             TableSchema tableSchema,
+            TableSchema databaseTableSchema,
             String[] pkNames,
             boolean isPrimaryKeyUpdated) {
 
@@ -248,6 +283,7 @@ public class JdbcOutputFormatBuilder {
                                         isPrimaryKeyUpdated),
                                 tableSchema.getFieldNames()),
                 tableSchema,
+                databaseTableSchema,
                 dialect.getRowConverter());
     }
 
@@ -256,6 +292,7 @@ public class JdbcOutputFormatBuilder {
             String database,
             String table,
             TableSchema tableSchema,
+            TableSchema databaseTableSchema,
             String[] pkNames,
             TableSchema pkTableSchema,
             Function<SeaTunnelRow, SeaTunnelRow> keyExtractor,
@@ -285,6 +322,7 @@ public class JdbcOutputFormatBuilder {
                 pkTableSchema,
                 keyExtractor,
                 tableSchema,
+                databaseTableSchema,
                 dialect.getRowConverter());
     }
 
@@ -293,18 +331,24 @@ public class JdbcOutputFormatBuilder {
             String database,
             String table,
             String[] pkNames,
-            TableSchema pkTableSchema) {
+            TableSchema pkTableSchema,
+            TableSchema databaseTableSchema) {
         String deleteSQL = dialect.getDeleteStatement(database, table, 
pkNames);
-        return createSimpleExecutor(deleteSQL, pkTableSchema, 
dialect.getRowConverter());
+        return createSimpleExecutor(
+                deleteSQL, pkTableSchema, databaseTableSchema, 
dialect.getRowConverter());
     }
 
     private static JdbcBatchStatementExecutor<SeaTunnelRow> 
createSimpleExecutor(
-            String sql, TableSchema tableSchema, JdbcRowConverter 
rowConverter) {
+            String sql,
+            TableSchema tableSchema,
+            TableSchema databaseTableSchema,
+            JdbcRowConverter rowConverter) {
         return new SimpleBatchStatementExecutor(
                 connection ->
                         FieldNamedPreparedStatement.prepareStatement(
                                 connection, sql, tableSchema.getFieldNames()),
                 tableSchema,
+                databaseTableSchema,
                 rowConverter);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index 691de6b77c..42bcf2d894 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -31,6 +31,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
+import javax.annotation.Nullable;
+
 import java.math.BigDecimal;
 import java.sql.Array;
 import java.sql.Date;
@@ -186,89 +188,37 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
     public PreparedStatement toExternal(
             TableSchema tableSchema, SeaTunnelRow row, PreparedStatement 
statement)
             throws SQLException {
+        return toExternal(tableSchema, null, row, statement);
+    }
+
+    @Override
+    public PreparedStatement toExternal(
+            TableSchema tableSchema,
+            @Nullable TableSchema databaseTableSchema,
+            SeaTunnelRow row,
+            PreparedStatement statement)
+            throws SQLException {
         SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
         for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); 
fieldIndex++) {
             try {
                 SeaTunnelDataType<?> seaTunnelDataType = 
rowType.getFieldType(fieldIndex);
+                String fieldName = rowType.getFieldName(fieldIndex);
                 int statementIndex = fieldIndex + 1;
                 Object fieldValue = row.getField(fieldIndex);
                 if (fieldValue == null) {
                     statement.setObject(statementIndex, null);
                     continue;
                 }
-
-                switch (seaTunnelDataType.getSqlType()) {
-                    case STRING:
-                        statement.setString(statementIndex, (String) 
row.getField(fieldIndex));
-                        break;
-                    case BOOLEAN:
-                        statement.setBoolean(statementIndex, (Boolean) 
row.getField(fieldIndex));
-                        break;
-                    case TINYINT:
-                        statement.setByte(statementIndex, (Byte) 
row.getField(fieldIndex));
-                        break;
-                    case SMALLINT:
-                        statement.setShort(statementIndex, (Short) 
row.getField(fieldIndex));
-                        break;
-                    case INT:
-                        statement.setInt(statementIndex, (Integer) 
row.getField(fieldIndex));
-                        break;
-                    case BIGINT:
-                        statement.setLong(statementIndex, (Long) 
row.getField(fieldIndex));
-                        break;
-                    case FLOAT:
-                        statement.setFloat(statementIndex, (Float) 
row.getField(fieldIndex));
-                        break;
-                    case DOUBLE:
-                        statement.setDouble(statementIndex, (Double) 
row.getField(fieldIndex));
-                        break;
-                    case DECIMAL:
-                        statement.setBigDecimal(
-                                statementIndex, (BigDecimal) 
row.getField(fieldIndex));
-                        break;
-                    case DATE:
-                        LocalDate localDate = (LocalDate) 
row.getField(fieldIndex);
-                        statement.setDate(statementIndex, 
java.sql.Date.valueOf(localDate));
-                        break;
-                    case TIME:
-                        writeTime(statement, statementIndex, (LocalTime) 
row.getField(fieldIndex));
-                        break;
-                    case TIMESTAMP:
-                        LocalDateTime localDateTime = (LocalDateTime) 
row.getField(fieldIndex);
-                        statement.setTimestamp(
-                                statementIndex, 
java.sql.Timestamp.valueOf(localDateTime));
-                        break;
-                    case BYTES:
-                        statement.setBytes(statementIndex, (byte[]) 
row.getField(fieldIndex));
-                        break;
-                    case NULL:
-                        statement.setNull(statementIndex, java.sql.Types.NULL);
-                        break;
-                    case ARRAY:
-                        SeaTunnelDataType elementType =
-                                ((ArrayType) 
seaTunnelDataType).getElementType();
-                        Object[] array = (Object[]) row.getField(fieldIndex);
-                        if (array == null) {
-                            statement.setNull(statementIndex, 
java.sql.Types.ARRAY);
-                            break;
-                        }
-                        if (SqlType.TINYINT.equals(elementType.getSqlType())) {
-                            Short[] shortArray = new Short[array.length];
-                            for (int i = 0; i < array.length; i++) {
-                                shortArray[i] = 
Short.valueOf(array[i].toString());
-                            }
-                            statement.setObject(statementIndex, shortArray);
-                        } else {
-                            statement.setObject(statementIndex, array);
-                        }
-                        break;
-                    case MAP:
-                    case ROW:
-                    default:
-                        throw new JdbcConnectorException(
-                                
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                                "Unexpected value: " + seaTunnelDataType);
+                String sourceType = null;
+                if (databaseTableSchema != null && 
databaseTableSchema.contains(fieldName)) {
+                    sourceType = 
databaseTableSchema.getColumn(fieldName).getSourceType();
                 }
+                setValueToStatementByDataType(
+                        row.getField(fieldIndex),
+                        statement,
+                        seaTunnelDataType,
+                        statementIndex,
+                        sourceType);
             } catch (Exception e) {
                 throw new JdbcConnectorException(
                         JdbcConnectorErrorCode.DATA_TYPE_CAST_FAILED,
@@ -279,6 +229,84 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
         return statement;
     }
 
+    protected void setValueToStatementByDataType(
+            Object value,
+            PreparedStatement statement,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            int statementIndex,
+            @Nullable String sourceType)
+            throws SQLException {
+        switch (seaTunnelDataType.getSqlType()) {
+            case STRING:
+                statement.setString(statementIndex, (String) value);
+                break;
+            case BOOLEAN:
+                statement.setBoolean(statementIndex, (Boolean) value);
+                break;
+            case TINYINT:
+                statement.setByte(statementIndex, (Byte) value);
+                break;
+            case SMALLINT:
+                statement.setShort(statementIndex, (Short) value);
+                break;
+            case INT:
+                statement.setInt(statementIndex, (Integer) value);
+                break;
+            case BIGINT:
+                statement.setLong(statementIndex, (Long) value);
+                break;
+            case FLOAT:
+                statement.setFloat(statementIndex, (Float) value);
+                break;
+            case DOUBLE:
+                statement.setDouble(statementIndex, (Double) value);
+                break;
+            case DECIMAL:
+                statement.setBigDecimal(statementIndex, (BigDecimal) value);
+                break;
+            case DATE:
+                LocalDate localDate = (LocalDate) value;
+                statement.setDate(statementIndex, Date.valueOf(localDate));
+                break;
+            case TIME:
+                writeTime(statement, statementIndex, (LocalTime) value);
+                break;
+            case TIMESTAMP:
+                LocalDateTime localDateTime = (LocalDateTime) value;
+                statement.setTimestamp(statementIndex, 
Timestamp.valueOf(localDateTime));
+                break;
+            case BYTES:
+                statement.setBytes(statementIndex, (byte[]) value);
+                break;
+            case NULL:
+                statement.setNull(statementIndex, java.sql.Types.NULL);
+                break;
+            case ARRAY:
+                SeaTunnelDataType elementType = ((ArrayType) 
seaTunnelDataType).getElementType();
+                Object[] array = (Object[]) value;
+                if (array == null) {
+                    statement.setNull(statementIndex, java.sql.Types.ARRAY);
+                    break;
+                }
+                if (SqlType.TINYINT.equals(elementType.getSqlType())) {
+                    Short[] shortArray = new Short[array.length];
+                    for (int i = 0; i < array.length; i++) {
+                        shortArray[i] = Short.valueOf(array[i].toString());
+                    }
+                    statement.setObject(statementIndex, shortArray);
+                } else {
+                    statement.setObject(statementIndex, array);
+                }
+                break;
+            case MAP:
+            case ROW:
+            default:
+                throw new JdbcConnectorException(
+                        CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
+                        "Unexpected value: " + seaTunnelDataType);
+        }
+    }
+
     protected void writeTime(PreparedStatement statement, int index, LocalTime 
time)
             throws SQLException {
         statement.setTime(index, java.sql.Time.valueOf(time));
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
index a8c7c079d3..f3cec3996c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/JdbcRowConverter.java
@@ -20,6 +20,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -38,7 +40,18 @@ public interface JdbcRowConverter extends Serializable {
      */
     SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws 
SQLException;
 
+    @Deprecated
     PreparedStatement toExternal(
             TableSchema tableSchema, SeaTunnelRow row, PreparedStatement 
statement)
             throws SQLException;
+
+    /** Convert data from internal {@link SeaTunnelRow} to JDBC object. */
+    default PreparedStatement toExternal(
+            TableSchema tableSchema,
+            @Nullable TableSchema databaseTableSchema,
+            SeaTunnelRow row,
+            PreparedStatement statement)
+            throws SQLException {
+        return toExternal(tableSchema, row, statement);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
index 28f7fdf425..7433e66bb6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
@@ -24,6 +24,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorExc
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 
+import javax.annotation.Nullable;
+
 import java.sql.PreparedStatement;
 
 public class HiveJdbcRowConverter extends AbstractJdbcRowConverter {
@@ -35,7 +37,10 @@ public class HiveJdbcRowConverter extends 
AbstractJdbcRowConverter {
 
     @Override
     public PreparedStatement toExternal(
-            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement 
statement) {
+            TableSchema tableSchema,
+            @Nullable TableSchema databaseTableSchema,
+            SeaTunnelRow row,
+            PreparedStatement statement) {
         throw new JdbcConnectorException(
                 JdbcConnectorErrorCode.DONT_SUPPORT_SINK,
                 "The Hive jdbc connector don't support sink");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
index 806788b30e..33c689fd39 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/inceptor/InceptorJdbcRowConverter.java
@@ -31,6 +31,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive.Hive
 
 import org.apache.commons.lang3.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.math.BigDecimal;
 import java.sql.PreparedStatement;
 import java.time.LocalDate;
@@ -46,7 +48,10 @@ public class InceptorJdbcRowConverter extends 
HiveJdbcRowConverter {
 
     @Override
     public PreparedStatement toExternal(
-            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement 
statement) {
+            TableSchema tableSchema,
+            @Nullable TableSchema databaseTableSchema,
+            SeaTunnelRow row,
+            PreparedStatement statement) {
         SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
         for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); 
fieldIndex++) {
             try {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
index 4a9411b99b..f766ce3980 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
@@ -27,6 +27,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.Abstrac
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
 
+import javax.annotation.Nullable;
+
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.PreparedStatement;
@@ -119,7 +121,10 @@ public class KingbaseJdbcRowConverter extends 
AbstractJdbcRowConverter {
 
     @Override
     public PreparedStatement toExternal(
-            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement 
statement)
+            TableSchema tableSchema,
+            @Nullable TableSchema databaseTableSchema,
+            SeaTunnelRow row,
+            PreparedStatement statement)
             throws SQLException {
         SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
         for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); 
fieldIndex++) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index 0a52e6a90b..5bf23ea1fe 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -34,6 +34,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
+import javax.annotation.Nullable;
+
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Date;
@@ -145,7 +147,10 @@ public class OceanBaseMysqlJdbcRowConverter extends 
AbstractJdbcRowConverter {
 
     @Override
     public PreparedStatement toExternal(
-            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement 
statement)
+            TableSchema tableSchema,
+            @Nullable TableSchema databaseTableSchema,
+            SeaTunnelRow row,
+            PreparedStatement statement)
             throws SQLException {
         SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
         for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); 
fieldIndex++) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java
index 6e32cca436..6c74387f47 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleJdbcRowConverter.java
@@ -17,13 +17,43 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
 
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 
+import javax.annotation.Nullable;
+
+import java.io.ByteArrayInputStream;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter.ORACLE_BLOB;
+
 public class OracleJdbcRowConverter extends AbstractJdbcRowConverter {
 
     @Override
     public String converterName() {
         return DatabaseIdentifier.ORACLE;
     }
+
+    @Override
+    protected void setValueToStatementByDataType(
+            Object value,
+            PreparedStatement statement,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            int statementIndex,
+            @Nullable String sourceType)
+            throws SQLException {
+        if (seaTunnelDataType.getSqlType().equals(SqlType.BYTES)) {
+            if (ORACLE_BLOB.equals(sourceType)) {
+                statement.setBinaryStream(statementIndex, new 
ByteArrayInputStream((byte[]) value));
+            } else {
+                statement.setBytes(statementIndex, (byte[]) value);
+            }
+        } else {
+            super.setValueToStatementByDataType(
+                    value, statement, seaTunnelDataType, statementIndex, 
sourceType);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index 071e8ec6e1..7fbb2f7782 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -33,6 +33,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
 
 import org.postgresql.util.PGobject;
 
+import javax.annotation.Nullable;
+
 import java.math.BigDecimal;
 import java.sql.Array;
 import java.sql.Date;
@@ -158,7 +160,10 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
 
     @Override
     public PreparedStatement toExternal(
-            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement 
statement)
+            TableSchema tableSchema,
+            @Nullable TableSchema databaseTableSchema,
+            SeaTunnelRow row,
+            PreparedStatement statement)
             throws SQLException {
         SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
         String[] sourceTypes =
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
index c98f50ba92..88e658fc38 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
@@ -336,7 +336,9 @@ public class FieldNamedPreparedStatement implements 
PreparedStatement {
 
     @Override
     public void setBinaryStream(int parameterIndex, InputStream x) throws 
SQLException {
-        throw new UnsupportedOperationException();
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setBinaryStream(index, x);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
index 9cf8b95863..ae08d027f7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/InsertOrUpdateBatchStatementExecutor.java
@@ -26,6 +26,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 
+import javax.annotation.Nullable;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -42,6 +44,7 @@ public class InsertOrUpdateBatchStatementExecutor
     private final TableSchema keyTableSchema;
     private final Function<SeaTunnelRow, SeaTunnelRow> keyExtractor;
     @NonNull private final TableSchema valueTableSchema;
+    @Nullable private final TableSchema databaseTableSchema;
     @NonNull private final JdbcRowConverter rowConverter;
     private transient PreparedStatement existStatement;
     private transient PreparedStatement insertStatement;
@@ -53,6 +56,7 @@ public class InsertOrUpdateBatchStatementExecutor
             StatementFactory insertStmtFactory,
             StatementFactory updateStmtFactory,
             TableSchema valueTableSchema,
+            TableSchema databaseTableSchema,
             JdbcRowConverter rowConverter) {
         this(
                 null,
@@ -61,6 +65,7 @@ public class InsertOrUpdateBatchStatementExecutor
                 null,
                 null,
                 valueTableSchema,
+                databaseTableSchema,
                 rowConverter);
     }
 
@@ -81,14 +86,14 @@ public class InsertOrUpdateBatchStatementExecutor
                 insertStatement.executeBatch();
                 insertStatement.clearBatch();
             }
-            rowConverter.toExternal(valueTableSchema, record, updateStatement);
+            rowConverter.toExternal(valueTableSchema, databaseTableSchema, 
record, updateStatement);
             updateStatement.addBatch();
         } else {
             if (preExistFlag != null && preExistFlag) {
                 updateStatement.executeBatch();
                 updateStatement.clearBatch();
             }
-            rowConverter.toExternal(valueTableSchema, record, insertStatement);
+            rowConverter.toExternal(valueTableSchema, databaseTableSchema, 
record, insertStatement);
             insertStatement.addBatch();
         }
 
@@ -147,7 +152,7 @@ public class InsertOrUpdateBatchStatementExecutor
     }
 
     private boolean exist(SeaTunnelRow pk) throws SQLException {
-        rowConverter.toExternal(keyTableSchema, pk, existStatement);
+        rowConverter.toExternal(keyTableSchema, databaseTableSchema, pk, 
existStatement);
         try (ResultSet resultSet = existStatement.executeQuery()) {
             return resultSet.next();
         }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
index a2f0add260..c7e6f8e2ce 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
@@ -24,6 +24,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 
+import javax.annotation.Nullable;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -32,6 +34,7 @@ import java.sql.SQLException;
 public class SimpleBatchStatementExecutor implements 
JdbcBatchStatementExecutor<SeaTunnelRow> {
     @NonNull private final StatementFactory statementFactory;
     @NonNull private final TableSchema tableSchema;
+    @Nullable private final TableSchema databaseTableSchema;
     @NonNull private final JdbcRowConverter converter;
     private transient PreparedStatement statement;
 
@@ -42,7 +45,7 @@ public class SimpleBatchStatementExecutor implements 
JdbcBatchStatementExecutor<
 
     @Override
     public void addToBatch(SeaTunnelRow record) throws SQLException {
-        converter.toExternal(tableSchema, record, statement);
+        converter.toExternal(tableSchema, databaseTableSchema, record, 
statement);
         statement.addBatch();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
index 7ab289edc1..521b65ed94 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
@@ -50,6 +50,7 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
     protected JdbcDialect dialect;
     protected TablePath sinkTablePath;
     protected TableSchema tableSchema;
+    protected TableSchema databaseTableSchema;
     protected transient boolean isOpen;
     protected JdbcConnectionProvider connectionProvider;
     protected JdbcSinkConfig jdbcSinkConfig;
@@ -76,7 +77,11 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
         }
         this.outputFormat =
                 new JdbcOutputFormatBuilder(
-                                dialect, connectionProvider, jdbcSinkConfig, 
tableSchema)
+                                dialect,
+                                connectionProvider,
+                                jdbcSinkConfig,
+                                tableSchema,
+                                databaseTableSchema)
                         .build();
         this.outputFormat.open();
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
index d14cf59211..874d04135f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -77,6 +77,7 @@ public class JdbcExactlyOnceSinkWriter extends 
AbstractJdbcSinkWriter<Void> {
             JdbcDialect dialect,
             JdbcSinkConfig jdbcSinkConfig,
             TableSchema tableSchema,
+            TableSchema databaseTableSchema,
             List<JdbcSinkState> states) {
         checkArgument(
                 jdbcSinkConfig.getJdbcConnectionConfig().getMaxRetries() == 0,
@@ -95,7 +96,9 @@ public class JdbcExactlyOnceSinkWriter extends 
AbstractJdbcSinkWriter<Void> {
                 
XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig());
         this.xaFacade = (XaFacade) this.connectionProvider;
         this.outputFormat =
-                new JdbcOutputFormatBuilder(dialect, xaFacade, jdbcSinkConfig, 
tableSchema).build();
+                new JdbcOutputFormatBuilder(
+                                dialect, xaFacade, jdbcSinkConfig, 
tableSchema, databaseTableSchema)
+                        .build();
         this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 2213a90808..25fc39005d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -34,6 +34,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.schema.SchemaChangeType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris.IrisCatalog;
@@ -52,6 +53,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -60,6 +63,7 @@ import java.util.Optional;
 
 import static 
org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
+@Slf4j
 public class JdbcSink
         implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, 
JdbcAggregatedCommitInfo>,
                 SupportSaveMode,
@@ -105,6 +109,11 @@ public class JdbcSink
 
     @Override
     public AbstractJdbcSinkWriter createWriter(SinkWriter.Context context) {
+        try {
+            
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
         TablePath sinkTablePath = catalogTable.getTablePath();
         AbstractJdbcSinkWriter sinkWriter;
         if (jdbcSinkConfig.isExactlyOnce()) {
@@ -116,18 +125,30 @@ public class JdbcSink
                             dialect,
                             jdbcSinkConfig,
                             tableSchema,
+                            getDatabaseTableSchema().orElse(null),
                             new ArrayList<>());
         } else {
-            if (catalogTable != null && 
catalogTable.getTableSchema().getPrimaryKey() != null) {
+            if (catalogTable.getTableSchema().getPrimaryKey() != null) {
                 String keyName = 
tableSchema.getPrimaryKey().getColumnNames().get(0);
                 int index = 
tableSchema.toPhysicalRowDataType().indexOf(keyName);
                 if (index > -1) {
                     return new JdbcSinkWriter(
-                            sinkTablePath, dialect, jdbcSinkConfig, 
tableSchema, index);
+                            sinkTablePath,
+                            dialect,
+                            jdbcSinkConfig,
+                            tableSchema,
+                            getDatabaseTableSchema().orElse(null),
+                            index);
                 }
             }
             sinkWriter =
-                    new JdbcSinkWriter(sinkTablePath, dialect, jdbcSinkConfig, 
tableSchema, null);
+                    new JdbcSinkWriter(
+                            sinkTablePath,
+                            dialect,
+                            jdbcSinkConfig,
+                            tableSchema,
+                            getDatabaseTableSchema().orElse(null),
+                            null);
         }
         return sinkWriter;
     }
@@ -135,6 +156,11 @@ public class JdbcSink
     @Override
     public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> restoreWriter(
             SinkWriter.Context context, List<JdbcSinkState> states) throws 
IOException {
+        try {
+            
Class.forName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
         TablePath sinkTablePath = catalogTable.getTablePath();
         if (jdbcSinkConfig.isExactlyOnce()) {
             return new JdbcExactlyOnceSinkWriter(
@@ -144,11 +170,37 @@ public class JdbcSink
                     dialect,
                     jdbcSinkConfig,
                     tableSchema,
+                    getDatabaseTableSchema().orElse(null),
                     states);
         }
         return SeaTunnelSink.super.restoreWriter(context, states);
     }
 
+    private Optional<TableSchema> getDatabaseTableSchema() {
+        Optional<Catalog> catalogOptional = getCatalog();
+        FieldIdeEnum fieldIdeEnumEnum = config.get(JdbcOptions.FIELD_IDE);
+        String fieldIde =
+                fieldIdeEnumEnum == null
+                        ? FieldIdeEnum.ORIGINAL.getValue()
+                        : fieldIdeEnumEnum.getValue();
+        TablePath tablePath =
+                TablePath.of(
+                        catalogTable.getTableId().getDatabaseName(),
+                        catalogTable.getTableId().getSchemaName(),
+                        CatalogUtils.quoteTableIdentifier(
+                                catalogTable.getTableId().getTableName(), 
fieldIde));
+        if (catalogOptional.isPresent()) {
+            try (Catalog catalog = catalogOptional.get()) {
+                catalog.open();
+                return 
Optional.of(catalog.getTable(tablePath).getTableSchema());
+            } catch (TableNotExistException e) {
+                log.warn("table {} not exist when get the database catalog 
table", tablePath);
+                return Optional.empty();
+            }
+        }
+        return Optional.empty();
+    }
+
     @Override
     public Optional<SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo>>
             createAggregatedCommitter() {
@@ -187,18 +239,7 @@ public class JdbcSink
             throw new RuntimeException(e);
         }
         if (catalogTable != null) {
-            if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
-                return Optional.empty();
-            }
-            if (StringUtils.isBlank(jdbcSinkConfig.getTable())) {
-                return Optional.empty();
-            }
-            // use query to write data can not support savemode
-            if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
-                return Optional.empty();
-            }
-            Optional<Catalog> catalogOptional =
-                    
JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect);
+            Optional<Catalog> catalogOptional = getCatalog();
             if (catalogOptional.isPresent()) {
                 try {
                     Catalog catalog = catalogOptional.get();
@@ -242,6 +283,20 @@ public class JdbcSink
         return Optional.empty();
     }
 
+    private Optional<Catalog> getCatalog() {
+        if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
+            return Optional.empty();
+        }
+        if (StringUtils.isBlank(jdbcSinkConfig.getTable())) {
+            return Optional.empty();
+        }
+        // use query to write data can not support get catalog
+        if (StringUtils.isNotBlank(jdbcSinkConfig.getSimpleSql())) {
+            return Optional.empty();
+        }
+        return 
JdbcCatalogUtils.findCatalog(jdbcSinkConfig.getJdbcConnectionConfig(), dialect);
+    }
+
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
         return Optional.ofNullable(catalogTable);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 3f43b2088d..d3c9949dc3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -49,17 +49,23 @@ public class JdbcSinkWriter extends 
AbstractJdbcSinkWriter<ConnectionPoolManager
             JdbcDialect dialect,
             JdbcSinkConfig jdbcSinkConfig,
             TableSchema tableSchema,
+            TableSchema databaseTableSchema,
             Integer primaryKeyIndex) {
         this.sinkTablePath = sinkTablePath;
         this.dialect = dialect;
         this.tableSchema = tableSchema;
+        this.databaseTableSchema = databaseTableSchema;
         this.jdbcSinkConfig = jdbcSinkConfig;
         this.primaryKeyIndex = primaryKeyIndex;
         this.connectionProvider =
                 
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
         this.outputFormat =
                 new JdbcOutputFormatBuilder(
-                                dialect, connectionProvider, jdbcSinkConfig, 
tableSchema)
+                                dialect,
+                                connectionProvider,
+                                jdbcSinkConfig,
+                                tableSchema,
+                                databaseTableSchema)
                         .build();
     }
 
@@ -97,7 +103,11 @@ public class JdbcSinkWriter extends 
AbstractJdbcSinkWriter<ConnectionPoolManager
                         queueIndex);
         this.outputFormat =
                 new JdbcOutputFormatBuilder(
-                                dialect, connectionProvider, jdbcSinkConfig, 
tableSchema)
+                                dialect,
+                                connectionProvider,
+                                jdbcSinkConfig,
+                                tableSchema,
+                                databaseTableSchema)
                         .build();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index b0195837cb..7b21e27364 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -44,6 +44,7 @@ import org.testcontainers.utility.DockerLoggerFactory;
 import org.testcontainers.utility.MountableFile;
 
 import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Statement;
 import java.sql.Timestamp;
@@ -53,6 +54,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class JdbcOracleIT extends AbstractJdbcIT {
 
@@ -81,6 +84,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                     + "    VARCHAR_10_COL                varchar2(10),\n"
                     + "    CHAR_10_COL                   char(10),\n"
                     + "    CLOB_COL                      clob,\n"
+                    + "    BLOB_COL                      blob,\n"
                     + "    NUMBER_1             number(1),\n"
                     + "    NUMBER_6             number(6),\n"
                     + "    NUMBER_10             number(10),\n"
@@ -104,6 +108,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                     + "    VARCHAR_10_COL                varchar2(10),\n"
                     + "    CHAR_10_COL                   char(10),\n"
                     + "    CLOB_COL                      clob,\n"
+                    + "    BLOB_COL                      blob,\n"
                     + "    NUMBER_1             number(1),\n"
                     + "    NUMBER_6             number(6),\n"
                     + "    NUMBER_10             number(10),\n"
@@ -125,6 +130,7 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                 "VARCHAR_10_COL",
                 "CHAR_10_COL",
                 "CLOB_COL",
+                "BLOB_COL",
                 "NUMBER_1",
                 "NUMBER_6",
                 "NUMBER_10",
@@ -230,6 +236,11 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                                 String.format("f%s", i),
                                 String.format("f%s", i),
                                 String.format("f%s", i),
+                                // set value bytes more than 4000bytes
+                                IntStream.range(0, 4000)
+                                        .mapToObj(d -> d + "")
+                                        .collect(Collectors.joining(","))
+                                        .getBytes(StandardCharsets.UTF_8),
                                 1,
                                 i * 10,
                                 i * 1000,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
index 4df8c7b993..d2e06158b8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink.conf
@@ -30,7 +30,7 @@ source {
     url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
     user = testUser
     password = testPassword
-    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
 FROM E2E_TABLE_SOURCE"
+    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
 FROM E2E_TABLE_SOURCE"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
@@ -46,7 +46,7 @@ sink {
     url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
     user = testUser
     password = testPassword
-    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
index 1988b48872..33cf33638f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select1.conf
@@ -31,7 +31,7 @@ source {
     user = testUser
     password = testPassword
     use_select_count = true
-    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
 FROM E2E_TABLE_SOURCE"
+    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL
 FROM E2E_TABLE_SOURCE"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
@@ -47,7 +47,7 @@ sink {
     url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
     user = testUser
     password = testPassword
-    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
index 4d01da5c72..e9e997ea90 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select2.conf
@@ -47,7 +47,7 @@ sink {
     url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
     user = testUser
     password = testPassword
-    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
index 94a850fdd0..2be0b51224 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_oracle_source_to_sink_use_select3.conf
@@ -48,7 +48,7 @@ sink {
     url = "jdbc:oracle:thin:@e2e_oracleDb:1521/TESTUSER"
     user = testUser
     password = testPassword
-    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+    query = "INSERT INTO E2E_TABLE_SINK 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,BLOB_COL,NUMBER_1,NUMBER_6,NUMBER_10,NUMBER_3_SF_2_DP,NUMBER_7_SF_N2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ,XML_TYPE_COL)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
     properties {
        database.oracle.jdbc.timezoneAsRegion = "false"
     }

Reply via email to