This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f4178c72f1 [Fix][Connector-V2] Fix OceanBase Oracle create unsupported 
data type (#9383)
f4178c72f1 is described below

commit f4178c72f1d217b5b2c177f4b8b1d804fdde20ad
Author: Jia Fan <[email protected]>
AuthorDate: Wed Jun 11 09:54:30 2025 +0800

    [Fix][Connector-V2] Fix OceanBase Oracle create unsupported data type 
(#9383)
---
 .../catalog/oceanbase/OceanBaseCatalogFactory.java |  2 +-
 .../catalog/oceanbase/OceanBaseOracleCatalog.java  |  6 ++
 .../OceanBaseOracleCreateTableSqlBuilder.java      | 84 +++++++++++++++++++++
 .../jdbc/catalog/oracle/OracleCatalog.java         |  2 +-
 .../oracle/OracleCreateTableSqlBuilder.java        | 15 +++-
 .../jdbc/internal/dialect/DatabaseIdentifier.java  |  2 +-
 .../dialect/oceanbase/OceanBaseDialectFactory.java |  2 +-
 .../oceanbase/OceanBaseMySqlTypeConverter.java     |  6 +-
 .../dialect/oceanbase/OceanBaseMysqlDialect.java   |  2 +-
 .../oceanbase/OceanBaseMysqlJdbcRowConverter.java  |  2 +-
 .../dialect/oracle/OracleTypeConverter.java        |  5 ++
 .../OceanBaseOracleCreateTableSqlBuilderTest.java  | 87 ++++++++++++++++++++++
 .../oracle/OracleCreateTableSqlBuilderTest.java    |  6 +-
 13 files changed, 209 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
index c13779b9e1..faebf5881a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseCatalogFactory.java
@@ -45,7 +45,7 @@ public class OceanBaseCatalogFactory implements 
CatalogFactory {
 
     @Override
     public String factoryIdentifier() {
-        return DatabaseIdentifier.OCENABASE;
+        return DatabaseIdentifier.OCEANBASE;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
index 4557d7667a..4eda157f2b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCatalog.java
@@ -99,4 +99,10 @@ public class OceanBaseOracleCatalog extends OracleCatalog {
 
         createTableInternal(tablePath, table, createIndex);
     }
+
+    @Override
+    protected List<String> getCreateTableSqls(
+            TablePath tablePath, CatalogTable table, boolean createIndex) {
+        return new OceanBaseOracleCreateTableSqlBuilder(table, 
createIndex).build(tablePath);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCreateTableSqlBuilder.java
new file mode 100644
index 0000000000..afd76f3488
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCreateTableSqlBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oracle.OracleCreateTableSqlBuilder;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class OceanBaseOracleCreateTableSqlBuilder extends 
OracleCreateTableSqlBuilder {
+
+    public OceanBaseOracleCreateTableSqlBuilder(CatalogTable catalogTable, 
boolean createIndex) {
+        super(catalogTable, createIndex);
+    }
+
+    @Override
+    protected String buildColumnSql(Column column) {
+        StringBuilder columnSql = new StringBuilder();
+        columnSql.append("\"").append(column.getName()).append("\" ");
+
+        String columnType = null;
+        if (column.getSinkType() != null) {
+            columnType = column.getSinkType();
+        } else if (StringUtils.isNotBlank(column.getSourceType())) {
+            if (StringUtils.equalsIgnoreCase(DatabaseIdentifier.OCEANBASE, 
sourceCatalogName)) {
+                columnType = column.getSourceType();
+            } else if (StringUtils.equalsIgnoreCase(DatabaseIdentifier.ORACLE, 
sourceCatalogName)) {
+                // handle OceanBase Oracle compatible mode unsupported types, 
please refer
+                // 
https://www.oceanbase.com/docs/enterprise-oceanbase-database-cn-10000000000355002
+                // and 
https://www.oceanbase.com/docs/enterprise-oms-doc-cn-1000000002530110
+                switch (column.getSourceType().toUpperCase()) {
+                    case OracleTypeConverter.ORACLE_LONG:
+                        columnType = OracleTypeConverter.ORACLE_CLOB;
+                        break;
+                    case OracleTypeConverter.ORACLE_LONG_RAW:
+                    case OracleTypeConverter.ORACLE_BFILE:
+                        columnType = OracleTypeConverter.ORACLE_BLOB;
+                        break;
+                    case OracleTypeConverter.ORACLE_NCLOB:
+                        // set max length to 32767, which is the maximum 
length supported by
+                        // OceanBase
+                        columnType = OracleTypeConverter.ORACLE_NVARCHAR2 + 
"(32767)";
+                        break;
+                    case OracleTypeConverter.ORACLE_REAL:
+                        columnType = OracleTypeConverter.ORACLE_FLOAT;
+                        break;
+                    default:
+                        columnType = column.getSourceType();
+                        break;
+                }
+            }
+        }
+
+        if (columnType == null) {
+            columnType = 
OracleTypeConverter.INSTANCE.reconvert(column).getColumnType();
+        }
+
+        columnSql.append(columnType);
+
+        if (!column.isNullable()) {
+            columnSql.append(" NOT NULL");
+        }
+
+        return columnSql.toString();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
index d228e0b04a..172965e04c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCatalog.java
@@ -132,7 +132,7 @@ public class OracleCatalog extends AbstractJdbcCatalog {
     @Override
     protected String getCreateTableSql(
             TablePath tablePath, CatalogTable table, boolean createIndex) {
-        return new OracleCreateTableSqlBuilder(table, 
createIndex).build(tablePath).get(0);
+        return getCreateTableSqls(tablePath, table, createIndex).get(0);
     }
 
     protected List<String> getCreateTableSqls(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
index 4bb7f262dc..f0812d70d6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java
@@ -36,13 +36,15 @@ public class OracleCreateTableSqlBuilder {
 
     private List<Column> columns;
     private PrimaryKey primaryKey;
-    private String sourceCatalogName;
+    private String comment;
+    protected String sourceCatalogName;
     private String fieldIde;
     private boolean createIndex;
 
     public OracleCreateTableSqlBuilder(CatalogTable catalogTable, boolean 
createIndex) {
         this.columns = catalogTable.getTableSchema().getColumns();
         this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+        this.comment = catalogTable.getComment();
         this.sourceCatalogName = catalogTable.getCatalogName();
         this.fieldIde = catalogTable.getOptions().get("fieldIde");
         this.createIndex = createIndex;
@@ -72,6 +74,15 @@ public class OracleCreateTableSqlBuilder {
         createTableSql.append(String.join(",\n", columnSqls));
         createTableSql.append("\n)");
         sqls.add(createTableSql.toString());
+        if (comment != null) {
+            String commentSql =
+                    "COMMENT ON TABLE "
+                            + tablePath.getSchemaAndTableName("\"")
+                            + " IS '"
+                            + comment
+                            + "'";
+            sqls.add(commentSql);
+        }
         List<String> commentSqls =
                 columns.stream()
                         .filter(column -> 
StringUtils.isNotBlank(column.getComment()))
@@ -84,7 +95,7 @@ public class OracleCreateTableSqlBuilder {
         return sqls;
     }
 
-    String buildColumnSql(Column column) {
+    protected String buildColumnSql(Column column) {
         StringBuilder columnSql = new StringBuilder();
         columnSql.append("\"").append(column.getName()).append("\" ");
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index 17f672213c..55d2ee7865 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -38,7 +38,7 @@ public class DatabaseIdentifier {
     public static final String TABLE_STORE = "Tablestore";
     public static final String TERADATA = "Teradata";
     public static final String VERTICA = "Vertica";
-    public static final String OCENABASE = "OceanBase";
+    public static final String OCEANBASE = "OceanBase";
     public static final String TIDB = "TiDB";
     public static final String XUGU = "XUGU";
     public static final String IRIS = "IRIS";
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
index 700015bdb6..d31e750006 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
@@ -30,7 +30,7 @@ import javax.annotation.Nonnull;
 public class OceanBaseDialectFactory implements JdbcDialectFactory {
     @Override
     public String dialectFactoryName() {
-        return DatabaseIdentifier.OCENABASE;
+        return DatabaseIdentifier.OCEANBASE;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
index f135701879..e3463fc914 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
@@ -108,7 +108,7 @@ public class OceanBaseMySqlTypeConverter
 
     @Override
     public String identifier() {
-        return DatabaseIdentifier.OCENABASE;
+        return DatabaseIdentifier.OCEANBASE;
     }
 
     @Override
@@ -308,7 +308,7 @@ public class OceanBaseMySqlTypeConverter
                 break;
             default:
                 throw CommonError.convertToSeaTunnelTypeError(
-                        DatabaseIdentifier.OCENABASE, mysqlDataType, 
typeDefine.getName());
+                        DatabaseIdentifier.OCEANBASE, mysqlDataType, 
typeDefine.getName());
         }
         return builder.build();
     }
@@ -525,7 +525,7 @@ public class OceanBaseMySqlTypeConverter
                 break;
             default:
                 throw CommonError.convertToConnectorTypeError(
-                        DatabaseIdentifier.OCENABASE,
+                        DatabaseIdentifier.OCEANBASE,
                         column.getDataType().getSqlType().name(),
                         column.getName());
         }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
index c89bde6427..320ced4083 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
@@ -65,7 +65,7 @@ public class OceanBaseMysqlDialect implements JdbcDialect {
 
     @Override
     public String dialectName() {
-        return DatabaseIdentifier.OCENABASE;
+        return DatabaseIdentifier.OCEANBASE;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
index 5bf23ea1fe..4984568e18 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlJdbcRowConverter.java
@@ -51,7 +51,7 @@ import java.util.Optional;
 public class OceanBaseMysqlJdbcRowConverter extends AbstractJdbcRowConverter {
     @Override
     public String converterName() {
-        return DatabaseIdentifier.OCENABASE;
+        return DatabaseIdentifier.OCEANBASE;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
index f4a7e2b82b..2b50ddfb9f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeConverter.java
@@ -71,6 +71,7 @@ public class OracleTypeConverter implements 
TypeConverter<BasicTypeDefine> {
     public static final String ORACLE_BLOB = "BLOB";
     public static final String ORACLE_RAW = "RAW";
     public static final String ORACLE_LONG_RAW = "LONG RAW";
+    public static final String ORACLE_BFILE = "BFILE";
 
     public static final int MAX_PRECISION = 38;
     public static final int DEFAULT_PRECISION = MAX_PRECISION;
@@ -218,6 +219,10 @@ public class OracleTypeConverter implements 
TypeConverter<BasicTypeDefine> {
                     builder.columnLength(BYTES_4GB - 1);
                 }
                 break;
+            case ORACLE_BFILE:
+                builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                builder.columnLength(BYTES_4GB - 1);
+                break;
             case ORACLE_RAW:
                 builder.dataType(PrimitiveByteArrayType.INSTANCE);
                 if (typeDefine.getLength() == null || typeDefine.getLength() 
== 0) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCreateTableSqlBuilderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCreateTableSqlBuilderTest.java
new file mode 100644
index 0000000000..4e580329d2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseOracleCreateTableSqlBuilderTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.oceanbase;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class OceanBaseOracleCreateTableSqlBuilderTest {
+
+    @Test
+    public void testColumnWithUnSupportedType() {
+
+        CatalogTable catalogTable =
+                CatalogTableUtil.getCatalogTable(
+                        "Oracle",
+                        "test_database",
+                        "test_schema",
+                        "test_table",
+                        new SeaTunnelRowType(
+                                new String[] {"field"},
+                                new SeaTunnelDataType[] 
{BasicType.STRING_TYPE}));
+        OceanBaseOracleCreateTableSqlBuilder sqlBuilder =
+                new OceanBaseOracleCreateTableSqlBuilder(catalogTable, false);
+
+        Column column = mock(Column.class);
+        when(column.getSourceType()).thenReturn("LONG");
+        when(column.getDataType()).thenReturn((SeaTunnelDataType) 
BasicType.INT_TYPE);
+        when(column.getName()).thenReturn("col1");
+        String result = sqlBuilder.buildColumnSql(column);
+        Assertions.assertEquals("\"col1\" CLOB NOT NULL", result);
+
+        when(column.getSourceType()).thenReturn("LONG RAW");
+        when(column.getDataType()).thenReturn((SeaTunnelDataType) 
BasicType.INT_TYPE);
+        when(column.getName()).thenReturn("col1");
+        result = sqlBuilder.buildColumnSql(column);
+        Assertions.assertEquals("\"col1\" BLOB NOT NULL", result);
+
+        when(column.getSourceType()).thenReturn("BFILE");
+        when(column.getDataType()).thenReturn((SeaTunnelDataType) 
BasicType.INT_TYPE);
+        when(column.getName()).thenReturn("col1");
+        result = sqlBuilder.buildColumnSql(column);
+        Assertions.assertEquals("\"col1\" BLOB NOT NULL", result);
+
+        when(column.getSourceType()).thenReturn("NCLOB");
+        when(column.getDataType()).thenReturn((SeaTunnelDataType) 
BasicType.INT_TYPE);
+        when(column.getName()).thenReturn("col1");
+        result = sqlBuilder.buildColumnSql(column);
+        Assertions.assertEquals("\"col1\" NVARCHAR2(32767) NOT NULL", result);
+
+        when(column.getSourceType()).thenReturn("REAL");
+        when(column.getDataType()).thenReturn((SeaTunnelDataType) 
BasicType.INT_TYPE);
+        when(column.getName()).thenReturn("col1");
+        result = sqlBuilder.buildColumnSql(column);
+        Assertions.assertEquals("\"col1\" FLOAT NOT NULL", result);
+
+        when(column.getSourceType()).thenReturn("OTHERTYPE");
+        when(column.getDataType()).thenReturn((SeaTunnelDataType) 
BasicType.INT_TYPE);
+        when(column.getName()).thenReturn("col1");
+        result = sqlBuilder.buildColumnSql(column);
+        Assertions.assertEquals("\"col1\" OTHERTYPE NOT NULL", result);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilderTest.java
index c2043c5e7b..255ef1ae9e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilderTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilderTest.java
@@ -39,6 +39,7 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -111,7 +112,8 @@ public class OracleCreateTableSqlBuilderTest {
 
         OracleCreateTableSqlBuilder oracleCreateTableSqlBuilder =
                 new OracleCreateTableSqlBuilder(catalogTable, true);
-        String createTableSql = 
oracleCreateTableSqlBuilder.build(tablePath).get(0);
+        List<String> sqls = oracleCreateTableSqlBuilder.build(tablePath);
+        String createTableSql = sqls.get(0);
         // create table sql is change; The old unit tests are no longer 
applicable
         String expect =
                 "CREATE TABLE \"test_table\" (\n"
@@ -131,6 +133,8 @@ public class OracleCreateTableSqlBuilderTest {
         CONSOLE.println(replacedStr2);
         Assertions.assertEquals(replacedStr2, replacedStr1);
 
+        Assertions.assertEquals("COMMENT ON TABLE \"test_table\" IS 'User 
table'", sqls.get(1));
+
         // skip index
         OracleCreateTableSqlBuilder oracleCreateTableSqlBuilderSkipIndex =
                 new OracleCreateTableSqlBuilder(catalogTable, false);

Reply via email to