This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d6b562bf52 [Feature]Add Kingbase Catalog Support (#10427)
d6b562bf52 is described below

commit d6b562bf527ef83dd3dbb71de74fc0c9d9a45dec
Author: panda <[email protected]>
AuthorDate: Tue Mar 17 15:10:35 2026 +0800

    [Feature]Add Kingbase Catalog Support (#10427)
    
    Co-authored-by: qcc-lyb <[email protected]>
---
 docs/en/connectors/source/Kingbase.md              |   9 +
 docs/zh/connectors/source/Kingbase.md              |   9 +
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  13 +
 .../jdbc/catalog/kingbase/KingbaseCatalog.java     | 261 ++++++++++
 .../catalog/kingbase/KingbaseCatalogFactory.java   |  62 +++
 .../kingbase/KingbaseCreateTableSqlBuilder.java    | 148 ++++++
 .../internal/dialect/kingbase/KingbaseDialect.java |  37 ++
 .../dialect/kingbase/KingbaseDialectFactory.java   |   7 +
 .../dialect/kingbase/KingbaseTypeConverter.java    | 111 ++++
 .../internal/dialect/mysql/MySqlTypeConverter.java |  76 +--
 .../jdbc/catalog/kingbase/KingbaseCatalogTest.java | 130 +++++
 .../KingbaseCreateTableSqlBuilderTest.java         | 571 +++++++++++++++++++++
 .../container/AbstractKingbaseContainerTest.java   | 128 +++++
 .../container/KingbaseCatalogContainerTest.java    | 364 +++++++++++++
 .../container/KingbaseDialectContainerTest.java    | 318 ++++++++++++
 15 files changed, 2206 insertions(+), 38 deletions(-)

diff --git a/docs/en/connectors/source/Kingbase.md 
b/docs/en/connectors/source/Kingbase.md
index 6d0d9fd354..4c0d74388e 100644
--- a/docs/en/connectors/source/Kingbase.md
+++ b/docs/en/connectors/source/Kingbase.md
@@ -70,6 +70,15 @@ Read external data source data through JDBC.
 | partition_upper_bound        | BigDecimal | No       | -               | The 
partition_column max value for scan, if not set SeaTunnel will query database 
get max value.                                                                  
                                                                                
                    |
 | partition_num                | Int        | No       | job parallelism | The 
number of partition count, only support positive integer. Default value is job 
parallelism.                                                                    
                                                                                
                   |
 | fetch_size                   | Int        | No       | 0               | For 
queries that return a large number of objects, you can configure <br/> the row 
fetch size used in the query to improve performance by <br/> reducing the 
number database hits required to satisfy the selection criteria.<br/> Zero 
means use jdbc default value. |
+| use_regex                    | Boolean    | No       | false           | 
Control regular expression matching for table_path. When set to `true`, the 
table_path will be treated as a regular expression pattern. When set to `false` 
or not specified, the table_path will be treated as an exact path (no regex 
matching).                 |
+| table_path                                 | String     | No       | -       
        | The path to the full path of table, you can use this configuration 
instead of `query`. <br/>example: <br/>"test_schema.table1"                     
                                                                                
                                                                                
                                                                                
                 [...]
+| table_list                                 | Array      | No       | -       
        | The list of tables to be read, you can use this configuration instead 
of `table_path` example: ```[{ table_path = "testdb.table1"}, {table_path = 
"testdb.table2", query = "select * id, name from testdb.table2"}]```            
                                                                                
                                                                                
                  [...]
+| where_condition                            | String     | No       | -       
        | Common row filter conditions for all tables/queries, must start with 
`where`. for example `where id > 100`                                           
                                                                                
                                                                                
                                                                                
               [...]
+| split.size                                 | Int        | No       | 8096    
        | The split size (number of rows) of table, captured tables are split 
into multiple splits when read of table.                                        
                                                                                
                                                                                
                                                                                
                [...]
+| split.even-distribution.factor.lower-bound | Double     | No       | 0.05    
        | The lower bound of the chunk key distribution factor. This factor is 
used to determine whether the table data is evenly distributed. If the 
distribution factor is calculated to be greater than or equal to this lower 
bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be 
optimized for even distribution. Otherwise, if the distribution factor is less, 
the table will be considered a [...]
+| split.even-distribution.factor.upper-bound | Double     | No       | 100     
        | The upper bound of the chunk key distribution factor. This factor is 
used to determine whether the table data is evenly distributed. If the 
distribution factor is calculated to be less than or equal to this upper bound 
(i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be 
optimized for even distribution. Otherwise, if the distribution factor is 
greater, the table will be considered a [...]
+| split.sample-sharding.threshold            | Int        | No       | 10000   
        | This configuration specifies the threshold of estimated shard count 
to trigger the sample sharding strategy. When the distribution factor is 
outside the bounds specified by 
`chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding st [...]
+| split.inverse-sampling.rate                | Int        | No       | 1000    
        | The inverse of the sampling rate used in the sample sharding 
strategy. For example, if this value is set to 1000, it means a 1/1000 sampling 
rate is applied during the sampling process. This option provides flexibility 
in controlling the granularity of the sampling, thus affecting the final number 
of shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is p [...]
 | common-options               |            | No       | -               | 
Source plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details                
                                                                                
                                                     |
 
 ### Tips
diff --git a/docs/zh/connectors/source/Kingbase.md 
b/docs/zh/connectors/source/Kingbase.md
index 606818224d..4980e86e38 100644
--- a/docs/zh/connectors/source/Kingbase.md
+++ b/docs/zh/connectors/source/Kingbase.md
@@ -70,6 +70,15 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 | partition_upper_bound | BigDecimal | 否 | - | partition_column 
的最大值用于扫描,如果未设置,SeaTunnel 将查询数据库获取最大值。 |
 | partition_num | Int | 否 | job parallelism | 分割数量,仅支持正整数。默认值是任务并行度。 |
 | fetch_size | Int | 否 | 0 | 
对于返回大量对象的查询,您可以配置查询中使用的行提取大小,以通过减少满足选择条件所需的数据库命中次数来提高性能。零表示使用 jdbc 默认值。 |
+| use_regex                                  | Boolean    | 否    | false | 
控制表路径的正则表达式匹配。当设置为true时,table_path 将被视为正则表达式模式。当设置为false或未指定时,table_path 
将被视为精确路径(不进行正则匹配)。                                                              
                                                              |
+| table_path                                 | String     | 否    | -     | 
表的完整路径,您可以使用此配置代替 `query`。<br/>示例:<br/>"testdb.table1"                          
        |
+| table_list                                 | Array      | 否    | -     | 
要读取的表的列表,您可以使用此配置代替 `table_path`,示例如下: ```[{ table_path = "testdb.table1"}, 
{table_path = "testdb.table2", query = "select * id, name from 
testdb.table2"}]```                                                         |
+| where_condition                            | String     | 否    | -     | 
所有表/查询的通用行过滤条件,必须以 `where` 开头。例如 `where id > 100`。                              
                                                                                
                                                       |
+| split.size                                 | Int        | 否    | 8096  | 
表的分割大小(行数),当读取表时,捕获的表会被分割成多个分片。                                                 
                                                                                
                                                       |
+| split.even-distribution.factor.lower-bound | Double     | 否    | 0.05  | 
分片键分布因子的下限。该因子用于判断表数据的分布是否均匀。如果计算得到的分布因子大于或等于该下限(即,(MAX(id) - MIN(id) + 1) / 
行数),则会对表的分片进行优化,以确保数据的均匀分布。反之,如果分布因子较低,则表数据将被视为分布不均匀。如果估算的分片数量超过 
`sample-sharding.threshold` 所指定的值,则会采用基于采样的分片策略。默认值为 0.05。               |
+| split.even-distribution.factor.upper-bound | Double     | 否    | 100   | 
分片键分布因子的上限。该因子用于判断表数据的分布是否均匀。如果计算得到的分布因子小于或等于该上限(即,(MAX(id) - MIN(id) + 1) / 
行数),则会对表的分片进行优化,以确保数据的均匀分布。反之,如果分布因子较大,则表数据将被视为分布不均匀,并且如果估算的分片数量超过 
`sample-sharding.threshold` 所指定的值,则会采用基于采样的分片策略。默认值为 100.0。            |
+| split.sample-sharding.threshold            | Int        | 否    | 10000 | 
此配置指定了触发样本分片策略的估算分片数阈值。当分布因子超出由 
`chunk-key.even-distribution.factor.upper-bound` 和 
`chunk-key.even-distribution.factor.lower-bound` 指定的范围,并且估算的分片数量(计算方法为大致行数 / 
分片大小)超过此阈值时,将使用样本分片策略。此配置有助于更高效地处理大型数据集。默认值为 1000 个分片。 |
+| split.inverse-sampling.rate                | Int        | 否    | 1000  | 
样本分片策略中使用的采样率的倒数。例如,如果该值设置为 1000,则表示在采样过程中应用 1/1000 
的采样率。此选项提供了灵活性,可以控制采样的粒度,从而影响最终的分片数量。特别适用于处理非常大的数据集,在这种情况下通常会选择较低的采样率。默认值为 
1000。                                                                           
        |
 | common-options | | 否 | - | 源插件通用参数,请参考 
[源通用选项](../common-options/source-common-options.md) 详见。 |
 
 ### 提示
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 7cb64cae34..71de3721cc 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -419,6 +419,19 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+
+        <!-- Testcontainers for unit tests -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>junit-jupiter</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalog.java
new file mode 100644
index 0000000000..c688d8541f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalog.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.kingbase;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase.KingbaseTypeConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase.KingbaseTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_METHOD;
+
+@Slf4j
+public class KingbaseCatalog extends AbstractJdbcCatalog {
+
+    protected static List<String> EXCLUDED_SCHEMAS =
+            Collections.unmodifiableList(
+                    Arrays.asList(
+                            "INFORMATION_SCHEMA",
+                            "SYSAUDIT",
+                            "SYSLOGICAL",
+                            "SYS_CATALOG",
+                            "SYS_HM",
+                            "XLOG_RECORD_READ"));
+
+    private static final String SELECT_COLUMNS_SQL_TEMPLATE =
+            " SELECT \n"
+                    + "    a.attname AS column_name,\n"
+                    + "    CASE \n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) IN 
('varchar', 'character varying') THEN 'VARCHAR'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) IN 
('char', 'character') THEN 'CHAR'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) IN 
('boolean', 'bool') THEN 'BOOL'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'real' THEN 'FLOAT4'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'double precision' THEN 'FLOAT8'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'integer' THEN 'INT4'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'bigint' THEN 'INT8'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'smallint' THEN 'INT2'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'time without time zone' THEN 'TIME'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'timestamp without time zone' THEN 'TIMESTAMP'\n"
+                    + "        WHEN lower(format_type(a.atttypid, NULL)) = 
'timestamp with time zone' THEN 'TIMESTAMPTZ'\n"
+                    + "        ELSE format_type(a.atttypid, NULL)\n"
+                    + "    END AS type_name,\n"
+                    + "    format_type(a.atttypid, a.atttypmod) AS 
full_type_name,\n"
+                    + "    CASE \n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ( 'CHAR','CHARACTER','VARCHAR','CHARACTER VARYING','BPCHAR') 
)\n"
+                    + "        THEN ABS(a.atttypmod)     \n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ('NUMERIC', 'DECIMAL'))\n"
+                    + "        THEN (a.atttypmod - 4) >> 16\n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ('INT', 'INTEGER', 'SMALLINT', 'BIGINT'))\n"
+                    + "        THEN NULL\n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ('TIME','TIMESTAMPTZ', 'TIMESTAMP'))\n"
+                    + "        THEN NULL\n"
+                    + "        ELSE NULL\n"
+                    + "    END AS column_length,\n"
+                    + "    CASE \n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ('NUMERIC', 'DECIMAL'))\n"
+                    + "        THEN (a.atttypmod - 4) >> 16\n"
+                    + "        ELSE NULL\n"
+                    + "    END AS column_precision,\n"
+                    + "    CASE \n"
+                    + "        WHEN a.atttypid IN (SELECT oid FROM sys_type 
WHERE typname IN ('NUMERIC', 'DECIMAL'))\n"
+                    + "        THEN (a.atttypmod - 4) & 65535\n"
+                    + "        ELSE NULL\n"
+                    + "    END AS column_scale,\n"
+                    + "    d.description AS column_comment,\n"
+                    + "    pg_get_expr(ad.adbin, ad.adrelid) AS 
default_value,\n"
+                    + "    CASE \n"
+                    + "        WHEN a.attnotnull = false THEN 'YES'\n"
+                    + "        ELSE 'NO'\n"
+                    + "    END AS is_nullable\n"
+                    + "FROM \n"
+                    + "    sys_class c\n"
+                    + "    JOIN sys_namespace n ON c.relnamespace = n.oid\n"
+                    + "    JOIN sys_attribute a ON a.attrelid = c.oid\n"
+                    + "    LEFT JOIN sys_description d ON d.objoid = 
a.attrelid AND d.objsubid = a.attnum\n"
+                    + "    LEFT JOIN sys_attrdef ad ON ad.adrelid = a.attrelid 
AND ad.adnum = a.attnum\n"
+                    + "WHERE \n"
+                    + "    n.nspname = '%s' \n"
+                    + "    AND c.relname = '%s' \n"
+                    + "    AND a.attnum > 0 \n"
+                    + "    AND NOT a.attisdropped;";
+
+    public KingbaseCatalog(
+            String catalogName,
+            String username,
+            String pwd,
+            JdbcUrlUtil.UrlInfo urlInfo,
+            String defaultSchema,
+            String driverClass) {
+        super(catalogName, username, pwd, urlInfo, defaultSchema, driverClass);
+    }
+
+    @Override
+    protected String getListDatabaseSql() {
+        return "SELECT current_database();";
+    }
+
+    /**
+     * Override the databaseExists method because SELECT current_database() 
does not support WHERE
+     */
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        if (StringUtils.isBlank(databaseName)) {
+            return false;
+        }
+        try {
+            return querySQLResultExists(getUrlFromDatabaseName(databaseName), 
getListDatabaseSql());
+        } catch (SeaTunnelRuntimeException e) {
+            if 
(e.getSeaTunnelErrorCode().getCode().equals(UNSUPPORTED_METHOD.getCode())) {
+                log.warn(
+                        "The catalog: {} is not supported the 
getListDatabaseSql for databaseExists",
+                        this.catalogName);
+                return listDatabases().contains(databaseName);
+            }
+            throw e;
+        } catch (SQLException e) {
+            throw new CatalogException("查询数据库是否存在失败: " + databaseName, e);
+        }
+    }
+
+    @Override
+    protected String getCreateTableSql(
+            TablePath tablePath, CatalogTable table, boolean createIndex) {
+        return new KingbaseCreateTableSqlBuilder(table, 
createIndex).build(tablePath);
+    }
+
+    @Override
+    protected String getDropTableSql(TablePath tablePath) {
+        return String.format("DROP TABLE %s", 
tablePath.getSchemaAndTableName("\""));
+    }
+
+    @Override
+    protected String getListTableSql(String databaseName) {
+        return "SELECT SCHEMANAME ,TABLENAME FROM SYS_TABLES";
+    }
+
+    @Override
+    protected String getTableWithConditionSql(TablePath tablePath) {
+        return String.format(
+                getListTableSql(tablePath.getDatabaseName())
+                        + "  where SCHEMANAME = '%s' and TABLENAME = '%s';",
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
+    }
+
+    @Override
+    protected String getTableName(ResultSet rs) throws SQLException {
+        if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
+            return null;
+        }
+        return rs.getString(1) + "." + rs.getString(2);
+    }
+
+    @Override
+    protected String getSelectColumnsSql(TablePath tablePath) {
+        return String.format(
+                SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), 
tablePath.getTableName());
+    }
+
+    @Override
+    protected Column buildColumn(ResultSet resultSet) throws SQLException {
+        String columnName = resultSet.getString("COLUMN_NAME");
+        String typeName = resultSet.getString("TYPE_NAME");
+        String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
+        long columnLength = resultSet.getLong("COLUMN_LENGTH");
+        long columnPrecision = resultSet.getLong("COLUMN_PRECISION");
+        int columnScale = resultSet.getInt("COLUMN_SCALE");
+        String columnComment = resultSet.getString("COLUMN_COMMENT");
+        Object defaultValue = resultSet.getObject("DEFAULT_VALUE");
+        boolean isNullable = resultSet.getString("IS_NULLABLE").equals("YES");
+
+        BasicTypeDefine typeDefine =
+                BasicTypeDefine.builder()
+                        .name(columnName)
+                        .columnType(fullTypeName)
+                        .dataType(typeName)
+                        .length(columnLength)
+                        .precision(columnPrecision)
+                        .scale(columnScale)
+                        .nullable(isNullable)
+                        .defaultValue(defaultValue)
+                        .comment(columnComment)
+                        .build();
+        return KingbaseTypeConverter.INSTANCE.convert(typeDefine);
+    }
+
+    @Override
+    protected String getOptionTableName(TablePath tablePath) {
+        return tablePath.getSchemaAndTableName();
+    }
+
+    @Override
+    public CatalogTable getTable(String sqlQuery) throws SQLException {
+        Connection defaultConnection = getConnection(defaultUrl);
+        return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new 
KingbaseTypeMapper());
+    }
+
+    @Override
+    protected String getTruncateTableSql(TablePath tablePath) {
+        return String.format(
+                "TRUNCATE TABLE \"%s\".\"%s\"",
+                tablePath.getSchemaName(), tablePath.getTableName());
+    }
+
+    @Override
+    protected String getExistDataSql(TablePath tablePath) {
+        return String.format(
+                "select * from \"%s\".\"%s\" LIMIT 1",
+                tablePath.getSchemaName(), tablePath.getTableName());
+    }
+
+    @Override
+    protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, 
TablePath tablePath)
+            throws SQLException {
+        try {
+            return getConstraintKeys(
+                    metaData,
+                    tablePath.getDatabaseName(),
+                    tablePath.getSchemaName(),
+                    tablePath.getTableName());
+        } catch (SQLException e) {
+            log.info("Obtain constraint failure", e);
+            return new ArrayList<>();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalogFactory.java
new file mode 100644
index 0000000000..6003a9a3f8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalogFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.kingbase;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcCommonOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class KingbaseCatalogFactory implements CatalogFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return DatabaseIdentifier.KINGBASE;
+    }
+
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        String urlWithDatabase = options.get(JdbcCommonOptions.URL);
+        Preconditions.checkArgument(
+                StringUtils.isNoneBlank(urlWithDatabase),
+                "Miss config <base-url>! Please check your config.");
+        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+        return new KingbaseCatalog(
+                catalogName,
+                options.get(JdbcCommonOptions.USERNAME),
+                options.get(JdbcCommonOptions.PASSWORD),
+                urlInfo,
+                options.get(JdbcCommonOptions.SCHEMA),
+                options.get(JdbcCommonOptions.DRIVER));
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return JdbcCommonOptions.BASE_CATALOG_RULE.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCreateTableSqlBuilder.java
new file mode 100644
index 0000000000..ffb679d632
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCreateTableSqlBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.kingbase;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase.KingbaseTypeConverter;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class KingbaseCreateTableSqlBuilder {
+
+    private List<Column> columns;
+    private PrimaryKey primaryKey;
+    private String sourceCatalogName;
+    private String fieldIde;
+    private boolean createIndex;
+
+    public KingbaseCreateTableSqlBuilder(CatalogTable catalogTable, boolean 
createIndex) {
+        this.columns = catalogTable.getTableSchema().getColumns();
+        this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+        this.sourceCatalogName = catalogTable.getCatalogName();
+        this.fieldIde = catalogTable.getOptions().get("fieldIde");
+        this.createIndex = createIndex;
+    }
+
+    public String build(TablePath tablePath) {
+        StringBuilder createTableSql = new StringBuilder();
+        createTableSql
+                .append("CREATE TABLE ")
+                .append(tablePath.getSchemaAndTableName("\""))
+                .append(" (\n");
+
+        List<String> columnSqls =
+                columns.stream()
+                        .map(column -> 
CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde))
+                        .collect(Collectors.toList());
+
+        // Add primary key directly in the create table statement
+        if (createIndex
+                && primaryKey != null
+                && primaryKey.getColumnNames() != null
+                && primaryKey.getColumnNames().size() > 0) {
+            columnSqls.add(buildPrimaryKeySql(primaryKey));
+        }
+
+        createTableSql.append(String.join(",\n", columnSqls));
+        createTableSql.append("\n)");
+
+        List<String> commentSqls =
+                columns.stream()
+                        .filter(column -> 
StringUtils.isNotBlank(column.getComment()))
+                        .map(
+                                column ->
+                                        buildColumnCommentSql(
+                                                column, 
tablePath.getSchemaAndTableName("\"")))
+                        .collect(Collectors.toList());
+
+        if (!commentSqls.isEmpty()) {
+            createTableSql.append(";\n");
+            createTableSql.append(String.join(";\n", commentSqls));
+        }
+
+        return createTableSql.toString();
+    }
+
+    private String buildColumnSql(Column column) {
+        StringBuilder columnSql = new StringBuilder();
+        columnSql.append("\"").append(column.getName()).append("\" ");
+
+        String columnType;
+        if (column.getSinkType() != null) {
+            columnType = column.getSinkType();
+        } else if (StringUtils.equalsIgnoreCase(DatabaseIdentifier.KINGBASE, 
sourceCatalogName)
+                && StringUtils.isNotBlank(column.getSourceType())) {
+            columnType = column.getSourceType();
+        } else {
+            columnType = 
KingbaseTypeConverter.INSTANCE.reconvert(column).getColumnType();
+        }
+        columnSql.append(columnType);
+
+        if (!column.isNullable()) {
+            columnSql.append(" NOT NULL");
+        }
+
+        return columnSql.toString();
+    }
+
+    private String buildPrimaryKeySql(PrimaryKey primaryKey) {
+        String randomSuffix = UUID.randomUUID().toString().replace("-", 
"").substring(0, 4);
+        String columnNamesString =
+                primaryKey.getColumnNames().stream()
+                        .map(columnName -> "\"" + columnName + "\"")
+                        .collect(Collectors.joining(", "));
+
+        String primaryKeyStr = primaryKey.getPrimaryKey();
+        if (primaryKeyStr.length() > 25) {
+            primaryKeyStr = primaryKeyStr.substring(0, 25);
+        }
+
+        return CatalogUtils.getFieldIde(
+                "CONSTRAINT "
+                        + primaryKeyStr
+                        + "_"
+                        + randomSuffix
+                        + " PRIMARY KEY ("
+                        + columnNamesString
+                        + ")",
+                fieldIde);
+    }
+
+    private String buildColumnCommentSql(Column column, String tableName) {
+        StringBuilder columnCommentSql = new StringBuilder();
+        columnCommentSql
+                .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", 
fieldIde))
+                .append(tableName)
+                .append(".");
+        columnCommentSql
+                .append(CatalogUtils.quoteIdentifier(column.getName(), 
fieldIde, "\""))
+                .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
+                .append(column.getComment().replace("'", "''"))
+                .append("'");
+        return columnCommentSql.toString();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java
index ecdd7dc236..72813c0f0a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
 
 import java.util.Arrays;
 import java.util.Optional;
@@ -29,6 +30,14 @@ import java.util.stream.Collectors;
 
 public class KingbaseDialect implements JdbcDialect {
 
+    public String fieldIde = FieldIdeEnum.ORIGINAL.getValue();
+
+    public KingbaseDialect() {}
+
+    public KingbaseDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
     @Override
     public String dialectName() {
         return DatabaseIdentifier.KINGBASE;
@@ -72,4 +81,32 @@ public class KingbaseDialect implements JdbcDialect {
     public TablePath parse(String tablePath) {
         return TablePath.of(tablePath, true);
     }
+
+    @Override
+    public String tableIdentifier(String database, String tableName) {
+        // resolve pg database name upper or lower not recognised
+        return quoteDatabaseIdentifier(database) + "." + 
quoteIdentifier(tableName);
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        if (identifier.contains(".")) {
+            String[] parts = identifier.split("\\.");
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < parts.length - 1; i++) {
+                sb.append("\"").append(parts[i]).append("\"").append(".");
+            }
+            return sb.append("\"")
+                    .append(getFieldIde(parts[parts.length - 1], fieldIde))
+                    .append("\"")
+                    .toString();
+        }
+
+        return "\"" + getFieldIde(identifier, fieldIde) + "\"";
+    }
+
+    @Override
+    public String quoteDatabaseIdentifier(String identifier) {
+        return "\"" + identifier + "\"";
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
index ad6c7d311b..cc6d2b5d38 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
@@ -23,6 +23,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
 
 import com.google.auto.service.AutoService;
 
+import javax.annotation.Nonnull;
+
 /** Factory for {@link KingbaseDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class KingbaseDialectFactory implements JdbcDialectFactory {
@@ -41,4 +43,9 @@ public class KingbaseDialectFactory implements 
JdbcDialectFactory {
     public JdbcDialect create() {
         return new KingbaseDialect();
     }
+
+    @Override
+    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
+        return new KingbaseDialect(fieldIde);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverter.java
index 3b922983b7..f46fccda93 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverter.java
@@ -23,11 +23,15 @@ import 
org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.converter.TypeConverter;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter;
 
 import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
@@ -64,6 +68,113 @@ public class KingbaseTypeConverter extends 
PostgresTypeConverter {
 
             String kingbaseDataType = typeDefine.getDataType().toUpperCase();
             switch (kingbaseDataType) {
+                    // MySQL compatibility - only types not in 
PostgresTypeConverter
+                    // int not in PG (PG has SMALLINT/INTEGER/BIGINT)
+                case MySqlTypeConverter.MYSQL_SMALLINT_UNSIGNED:
+                case MySqlTypeConverter.MYSQL_MEDIUMINT:
+                case MySqlTypeConverter.MYSQL_MEDIUMINT_UNSIGNED:
+                case MySqlTypeConverter.MYSQL_INT:
+                case MySqlTypeConverter.MYSQL_INTEGER:
+                case MySqlTypeConverter.MYSQL_YEAR:
+                case MySqlTypeConverter.MYSQL_YEAR_UNSIGNED:
+                    builder.dataType(BasicType.INT_TYPE);
+                    break;
+                    // DATETIME not in PG (PG has TIMESTAMP)
+                case MySqlTypeConverter.MYSQL_DATETIME:
+                    builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+                    if (typeDefine.getScale() != null
+                            && typeDefine.getScale() > MAX_TIMESTAMP_SCALE) {
+                        builder.scale(MAX_TIMESTAMP_SCALE);
+                        log.warn(
+                                "The timestamp column {} type timestamp({}) is 
out of range, "
+                                        + "which exceeds the maximum scale of 
{}, "
+                                        + "it will be converted to 
timestamp({})",
+                                typeDefine.getName(),
+                                typeDefine.getScale(),
+                                MAX_TIMESTAMP_SCALE,
+                                MAX_TIMESTAMP_SCALE);
+                    } else {
+                        builder.scale(typeDefine.getScale());
+                    }
+                    break;
+                    // Binary types not in PG (PG has BYTEA)
+                case MySqlTypeConverter.MYSQL_BINARY:
+                case MySqlTypeConverter.MYSQL_VARBINARY:
+                case MySqlTypeConverter.MYSQL_TINYBLOB:
+                case MySqlTypeConverter.MYSQL_MEDIUMBLOB:
+                case MySqlTypeConverter.MYSQL_LONGBLOB:
+                    builder.dataType(PrimitiveByteArrayType.INSTANCE);
+                    if (typeDefine.getLength() != null && 
typeDefine.getLength() > 0) {
+                        builder.columnLength(typeDefine.getLength());
+                    } else {
+                        builder.columnLength((long) (1024 * 1024 * 1024));
+                    }
+                    break;
+                    // Text types not in PG (PG has TEXT/VARCHAR/CHAR)
+                case MySqlTypeConverter.MYSQL_TINYTEXT:
+                case MySqlTypeConverter.MYSQL_MEDIUMTEXT:
+                case MySqlTypeConverter.MYSQL_LONGTEXT:
+                    builder.dataType(BasicType.STRING_TYPE);
+                    if (typeDefine.getLength() != null && 
typeDefine.getLength() > 0) {
+                        builder.columnLength(typeDefine.getLength());
+                    }
+                    break;
+                    // Oracle compatibility - Oracle specific types (not in 
PostgresTypeConverter)
+                    // NUMBER is Oracle-specific numeric type
+                case OracleTypeConverter.ORACLE_NUMBER:
+                    DecimalType oracleDecimal =
+                            new DecimalType(
+                                    typeDefine.getPrecision() == null
+                                            ? DEFAULT_PRECISION
+                                            : 
typeDefine.getPrecision().intValue(),
+                                    typeDefine.getScale() == null ? 0 : 
typeDefine.getScale());
+                    builder.dataType(oracleDecimal);
+                    builder.columnLength((long) oracleDecimal.getPrecision());
+                    builder.scale(oracleDecimal.getScale());
+                    break;
+                    // FLOAT is different from PG FLOAT
+                case OracleTypeConverter.ORACLE_FLOAT:
+                    DecimalType floatDecimal = new 
DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE);
+                    builder.dataType(floatDecimal);
+                    builder.columnLength((long) floatDecimal.getPrecision());
+                    builder.scale(floatDecimal.getScale());
+                    break;
+                    // Oracle string types (VARCHAR2, NVARCHAR2, NCHAR differ 
from PG)
+                case OracleTypeConverter.ORACLE_VARCHAR2:
+                case OracleTypeConverter.ORACLE_NVARCHAR2:
+                case OracleTypeConverter.ORACLE_NCHAR:
+                case OracleTypeConverter.ORACLE_LONG:
+                case OracleTypeConverter.ORACLE_ROWID:
+                case OracleTypeConverter.ORACLE_NCLOB:
+                case OracleTypeConverter.ORACLE_XML:
+                case OracleTypeConverter.ORACLE_SYS_XML:
+                    builder.dataType(BasicType.STRING_TYPE);
+                    if (typeDefine.getLength() != null && 
typeDefine.getLength() > 0) {
+                        builder.columnLength(typeDefine.getLength());
+                    } else {
+                        builder.columnLength((long) (1024 * 1024 * 1024));
+                    }
+                    break;
+                    // SQLServer compatibility - SQLServer specific types
+                case SqlServerTypeConverter.SQLSERVER_DATETIME2:
+                case SqlServerTypeConverter.SQLSERVER_SMALLDATETIME:
+                case SqlServerTypeConverter.SQLSERVER_DATETIMEOFFSET:
+                    builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+                    if (typeDefine.getScale() != null
+                            && typeDefine.getScale() > MAX_TIMESTAMP_SCALE) {
+                        builder.scale(MAX_TIMESTAMP_SCALE);
+                        log.warn(
+                                "The timestamp column {} type timestamp({}) is 
out of range, "
+                                        + "which exceeds the maximum scale of 
{}, "
+                                        + "it will be converted to 
timestamp({})",
+                                typeDefine.getName(),
+                                typeDefine.getScale(),
+                                MAX_TIMESTAMP_SCALE,
+                                MAX_TIMESTAMP_SCALE);
+                    } else {
+                        builder.scale(typeDefine.getScale());
+                    }
+                    break;
                 case KB_TINYINT:
                     builder.dataType(BasicType.BYTE_TYPE);
                     break;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverter.java
index d62c81cb4f..4076a2303e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverter.java
@@ -42,57 +42,57 @@ import lombok.extern.slf4j.Slf4j;
 public class MySqlTypeConverter implements 
TypeConverter<BasicTypeDefine<MysqlType>> {
 
     // ============================data types=====================
-    static final String MYSQL_NULL = "NULL";
-    static final String MYSQL_BIT = "BIT";
-    static final String MYSQL_BIT_UNSIGNED = "BIT UNSIGNED";
+    public static final String MYSQL_NULL = "NULL";
+    public static final String MYSQL_BIT = "BIT";
+    public static final String MYSQL_BIT_UNSIGNED = "BIT UNSIGNED";
 
     // -------------------------number----------------------------
-    static final String MYSQL_TINYINT = "TINYINT";
-    static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
-    static final String MYSQL_SMALLINT = "SMALLINT";
-    static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
-    static final String MYSQL_MEDIUMINT = "MEDIUMINT";
-    static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
-    static final String MYSQL_INT = "INT";
-    static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
-    static final String MYSQL_INTEGER = "INTEGER";
-    static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
-    static final String MYSQL_BIGINT = "BIGINT";
-    static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
-    static final String MYSQL_DECIMAL = "DECIMAL";
-    static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
-    static final String MYSQL_FLOAT = "FLOAT";
-    static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
-    static final String MYSQL_DOUBLE = "DOUBLE";
-    static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
 
     // -------------------------string----------------------------
     public static final String MYSQL_CHAR = "CHAR";
     public static final String MYSQL_VARCHAR = "VARCHAR";
-    static final String MYSQL_TINYTEXT = "TINYTEXT";
-    static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
-    static final String MYSQL_TEXT = "TEXT";
-    static final String MYSQL_LONGTEXT = "LONGTEXT";
-    static final String MYSQL_JSON = "JSON";
-    static final String MYSQL_ENUM = "ENUM";
-    static final String MYSQL_SET = "SET";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+    public static final String MYSQL_ENUM = "ENUM";
+    public static final String MYSQL_SET = "SET";
 
     // ------------------------------time-------------------------
-    static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATE = "DATE";
     public static final String MYSQL_DATETIME = "DATETIME";
     public static final String MYSQL_TIME = "TIME";
     public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
-    static final String MYSQL_YEAR = "YEAR";
-    static final String MYSQL_YEAR_UNSIGNED = "YEAR UNSIGNED";
+    public static final String MYSQL_YEAR = "YEAR";
+    public static final String MYSQL_YEAR_UNSIGNED = "YEAR UNSIGNED";
 
     // ------------------------------blob-------------------------
-    static final String MYSQL_TINYBLOB = "TINYBLOB";
-    static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
-    static final String MYSQL_BLOB = "BLOB";
-    static final String MYSQL_LONGBLOB = "LONGBLOB";
-    static final String MYSQL_BINARY = "BINARY";
-    static final String MYSQL_VARBINARY = "VARBINARY";
-    static final String MYSQL_GEOMETRY = "GEOMETRY";
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
 
     public static final int DEFAULT_PRECISION = 38;
     public static final int MAX_PRECISION = 65;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalogTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalogTest.java
new file mode 100644
index 0000000000..9fda0abdd4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCatalogTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.kingbase;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+@Disabled("Please Test it in your local environment")
+class KingbaseCatalogTest {
+
+    private static final String DATABASE = "test";
+    private static final String SCHEMA = "public";
+    private static final String SOURCE_TABLE = "st_type_converter_source";
+    private static final String TARGET_TABLE = "st_type_converter_target";
+
+    private static KingbaseCatalog catalog;
+
+    @BeforeAll
+    static void before() {
+        catalog =
+                new KingbaseCatalog(
+                        "kingbase",
+                        "kingbase",
+                        "kingbase",
+                        
JdbcUrlUtil.getUrlInfo("jdbc:kingbase8://192.168.102.101:54321/test"),
+                        null,
+                        null);
+        catalog.open();
+    }
+
+    @AfterAll
+    static void after() {
+        TablePath sourcePath = TablePath.of(DATABASE, SCHEMA, SOURCE_TABLE);
+        TablePath targetPath = TablePath.of(DATABASE, SCHEMA, TARGET_TABLE);
+        dropTableIfExists(targetPath);
+        dropTableIfExists(sourcePath);
+        catalog.close();
+    }
+
+    @Test
+    void databaseExists() {
+        Assertions.assertTrue(catalog.databaseExists(DATABASE));
+    }
+
+    @Test
+    void createTableFromSource() {
+        TablePath sourcePath = TablePath.of(DATABASE, SCHEMA, SOURCE_TABLE);
+        TablePath targetPath = TablePath.of(DATABASE, SCHEMA, TARGET_TABLE);
+
+        dropTableIfExists(targetPath);
+        dropTableIfExists(sourcePath);
+
+        catalog.executeSql(sourcePath, buildCreateTableSql(sourcePath));
+        Assertions.assertTrue(catalog.tableExists(sourcePath));
+
+        CatalogTable sourceTable = catalog.getTable(sourcePath);
+        catalog.createTable(targetPath, sourceTable, true);
+        Assertions.assertTrue(catalog.tableExists(targetPath));
+    }
+
+    private static void dropTableIfExists(TablePath tablePath) {
+        if (catalog.tableExists(tablePath)) {
+            catalog.dropTable(tablePath, true);
+        }
+    }
+
+    private static String buildCreateTableSql(TablePath tablePath) {
+        List<String> columns =
+                Lists.newArrayList(
+                        "\"id\" BIGSERIAL PRIMARY KEY",
+                        "\"c_smallserial\" SMALLSERIAL",
+                        "\"c_serial\" SERIAL",
+                        "\"c_tinyint\" TINYINT",
+                        "\"c_bool\" BOOL",
+                        "\"c_int2\" INT2",
+                        "\"c_int4\" INT4",
+                        "\"c_int8\" INT8",
+                        "\"c_float4\" FLOAT4",
+                        "\"c_float8\" FLOAT8",
+                        "\"c_numeric\" NUMERIC(38,18)",
+                        "\"c_money\" MONEY",
+                        "\"c_bytea\" BYTEA",
+                        "\"c_blob\" BLOB",
+                        "\"c_clob\" CLOB",
+                        "\"c_bit\" BIT(16)",
+                        "\"c_char\" CHARACTER(10)",
+                        "\"c_bpchar\" BPCHAR(10)",
+                        "\"c_varchar\" VARCHAR(255)",
+                        "\"c_text\" TEXT",
+                        "\"c_date\" DATE",
+                        "\"c_time\" TIME",
+                        "\"c_timestamp\" TIMESTAMP",
+                        "\"c_timestamptz\" TIMESTAMPTZ",
+                        "\"c_uuid\" UUID",
+                        "\"c_json\" JSON",
+                        "\"c_jsonb\" JSONB");
+
+        return "CREATE TABLE "
+                + tablePath.getSchemaAndTableName("\"")
+                + " (\n"
+                + String.join(",\n", columns)
+                + "\n);";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCreateTableSqlBuilderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCreateTableSqlBuilderTest.java
new file mode 100644
index 0000000000..9d3b4cd229
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/kingbase/KingbaseCreateTableSqlBuilderTest.java
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.kingbase;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+class KingbaseCreateTableSqlBuilderTest {
+
+    @Test
+    void testBuildWithKingbaseCatalog() {
+        TablePath tablePath = TablePath.of("test", "public", "test_table");
+
+        CatalogTable catalogTable = kingbaseCatalogTable(tablePath);
+        String createTableSql =
+                new KingbaseCreateTableSqlBuilder(catalogTable, 
true).build(tablePath);
+        String expectedSql = buildExpectedSql(tablePath, true);
+
+        Assertions.assertEquals(
+                expectedSql.replaceAll("pk_id_\\w+", "pk_id_"),
+                createTableSql.replaceAll("pk_id_\\w+", "pk_id_"));
+
+        String createTableSqlSkipIndex =
+                new KingbaseCreateTableSqlBuilder(catalogTable, 
false).build(tablePath);
+        String expectedSqlSkipIndex = buildExpectedSql(tablePath, false);
+        Assertions.assertEquals(expectedSqlSkipIndex, createTableSqlSkipIndex);
+    }
+
+    @Test
+    void testBuildWithOtherCatalog() {
+        TablePath tablePath = TablePath.of("test_database", "public", 
"st_type_converter_test");
+
+        CatalogTable catalogTable = otherCatalogTable(tablePath);
+        String createTableSql =
+                new KingbaseCreateTableSqlBuilder(catalogTable, 
true).build(tablePath);
+        String expectedSql = buildExpectedSqlFromOtherCatalog(tablePath, true);
+
+        Assertions.assertEquals(
+                expectedSql.replaceAll("pk_id_\\w+", "pk_id_"),
+                createTableSql.replaceAll("pk_id_\\w+", "pk_id_"));
+
+        String createTableSqlSkipIndex =
+                new KingbaseCreateTableSqlBuilder(catalogTable, 
false).build(tablePath);
+        String expectedSqlSkipIndex = 
buildExpectedSqlFromOtherCatalog(tablePath, false);
+        Assertions.assertEquals(expectedSqlSkipIndex, createTableSqlSkipIndex);
+    }
+
+    private CatalogTable kingbaseCatalogTable(TablePath tablePath) {
+        List<Column> columns =
+                Lists.newArrayList(
+                        PhysicalColumn.of(
+                                "id",
+                                BasicType.LONG_TYPE,
+                                null,
+                                false,
+                                null,
+                                "id",
+                                "BIGSERIAL",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_smallserial",
+                                BasicType.SHORT_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_smallserial",
+                                "SMALLSERIAL",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_serial",
+                                BasicType.INT_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_serial",
+                                "SERIAL",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_tinyint",
+                                BasicType.BYTE_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_tinyint",
+                                "TINYINT",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_bool",
+                                BasicType.BOOLEAN_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_bool",
+                                "BOOL",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_int2",
+                                BasicType.SHORT_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_int2",
+                                "INT2",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_int4",
+                                BasicType.INT_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_int4",
+                                "INT4",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_int8",
+                                BasicType.LONG_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_int8",
+                                "INT8",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_float4",
+                                BasicType.FLOAT_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_float4",
+                                "FLOAT4",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_float8",
+                                BasicType.DOUBLE_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_float8",
+                                "FLOAT8",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_numeric",
+                                new DecimalType(38, 18),
+                                38L,
+                                18,
+                                true,
+                                null,
+                                "c_numeric",
+                                "NUMERIC(38,18)",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_money",
+                                new DecimalType(38, 18),
+                                38L,
+                                18,
+                                true,
+                                null,
+                                "c_money",
+                                "MONEY",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_bytea",
+                                PrimitiveByteArrayType.INSTANCE,
+                                null,
+                                true,
+                                null,
+                                "c_bytea",
+                                "BYTEA",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_blob",
+                                PrimitiveByteArrayType.INSTANCE,
+                                null,
+                                true,
+                                null,
+                                "c_blob",
+                                "BLOB",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_clob",
+                                BasicType.STRING_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_clob",
+                                "CLOB",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_bit",
+                                PrimitiveByteArrayType.INSTANCE,
+                                16L,
+                                true,
+                                null,
+                                "c_bit",
+                                "BIT(16)",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_char",
+                                BasicType.STRING_TYPE,
+                                10L,
+                                true,
+                                null,
+                                "c_char",
+                                "CHARACTER(10)",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_bpchar",
+                                BasicType.STRING_TYPE,
+                                10L,
+                                true,
+                                null,
+                                "c_bpchar",
+                                "BPCHAR(10)",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_varchar",
+                                BasicType.STRING_TYPE,
+                                255L,
+                                true,
+                                null,
+                                "c_varchar",
+                                "VARCHAR(255)",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_text",
+                                BasicType.STRING_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_text",
+                                "TEXT",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_date",
+                                LocalTimeType.LOCAL_DATE_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_date",
+                                "DATE",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_time",
+                                LocalTimeType.LOCAL_TIME_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_time",
+                                "TIME",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_timestamp",
+                                LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_timestamp",
+                                "TIMESTAMP",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_timestamptz",
+                                LocalTimeType.OFFSET_DATE_TIME_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_timestamptz",
+                                "TIMESTAMPTZ",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_uuid",
+                                BasicType.STRING_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_uuid",
+                                "UUID",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_json",
+                                BasicType.STRING_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_json",
+                                "JSON",
+                                Collections.emptyMap()),
+                        PhysicalColumn.of(
+                                "c_jsonb",
+                                BasicType.STRING_TYPE,
+                                null,
+                                true,
+                                null,
+                                "c_jsonb",
+                                "JSONB",
+                                Collections.emptyMap()));
+
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .columns(columns)
+                        .primaryKey(PrimaryKey.of("pk_id", 
Lists.newArrayList("id")))
+                        .build();
+
+        return CatalogTable.of(
+                TableIdentifier.of(DatabaseIdentifier.KINGBASE, tablePath),
+                tableSchema,
+                new HashMap<>(),
+                Lists.newArrayList(),
+                "test table");
+    }
+
+    private CatalogTable otherCatalogTable(TablePath tablePath) {
+        List<Column> columns =
+                Lists.newArrayList(
+                        PhysicalColumn.of(
+                                "id", BasicType.LONG_TYPE, (Long) null, false, 
null, "id"),
+                        PhysicalColumn.of(
+                                "c_bool",
+                                BasicType.BOOLEAN_TYPE,
+                                (Long) null,
+                                false,
+                                null,
+                                "c_bool"),
+                        PhysicalColumn.of(
+                                "c_int2", BasicType.SHORT_TYPE, (Long) null, 
true, null, "c_int2"),
+                        PhysicalColumn.of(
+                                "c_int4", BasicType.INT_TYPE, (Long) null, 
true, null, "c_int4"),
+                        PhysicalColumn.of(
+                                "c_int8", BasicType.LONG_TYPE, (Long) null, 
true, null, "c_int8"),
+                        PhysicalColumn.of(
+                                "c_float4",
+                                BasicType.FLOAT_TYPE,
+                                (Long) null,
+                                true,
+                                null,
+                                "c_float4"),
+                        PhysicalColumn.of(
+                                "c_float8",
+                                BasicType.DOUBLE_TYPE,
+                                (Long) null,
+                                true,
+                                null,
+                                "c_float8"),
+                        PhysicalColumn.of(
+                                "c_numeric",
+                                new DecimalType(38, 18),
+                                38L,
+                                18,
+                                true,
+                                null,
+                                "c_numeric"),
+                        PhysicalColumn.of(
+                                "c_bytea",
+                                PrimitiveByteArrayType.INSTANCE,
+                                (Long) null,
+                                true,
+                                null,
+                                "c_bytea"),
+                        PhysicalColumn.of(
+                                "c_varchar", BasicType.STRING_TYPE, 255L, 
true, null, "c_varchar"),
+                        PhysicalColumn.of(
+                                "c_text", BasicType.STRING_TYPE, (Long) null, 
true, null, "c_text"),
+                        PhysicalColumn.of(
+                                "c_date",
+                                LocalTimeType.LOCAL_DATE_TYPE,
+                                (Long) null,
+                                true,
+                                null,
+                                "c_date"),
+                        PhysicalColumn.of(
+                                "c_time",
+                                LocalTimeType.LOCAL_TIME_TYPE,
+                                (Long) null,
+                                true,
+                                null,
+                                "c_time"),
+                        PhysicalColumn.of(
+                                "c_timestamp",
+                                LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                (Long) null,
+                                true,
+                                null,
+                                "c_timestamp"),
+                        PhysicalColumn.of(
+                                "c_timestamptz",
+                                LocalTimeType.OFFSET_DATE_TIME_TYPE,
+                                (Long) null,
+                                true,
+                                null,
+                                "c_timestamptz"));
+
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .columns(columns)
+                        .primaryKey(PrimaryKey.of("pk_id", 
Lists.newArrayList("id")))
+                        .build();
+
+        return CatalogTable.of(
+                TableIdentifier.of(DatabaseIdentifier.MYSQL, tablePath),
+                tableSchema,
+                new HashMap<>(),
+                Lists.newArrayList(),
+                "test table");
+    }
+
+    private String buildExpectedSql(TablePath tablePath, boolean 
includePrimaryKey) {
+        List<String> columnSqls =
+                Lists.newArrayList(
+                        "\"id\" BIGSERIAL NOT NULL",
+                        "\"c_smallserial\" SMALLSERIAL",
+                        "\"c_serial\" SERIAL",
+                        "\"c_tinyint\" TINYINT",
+                        "\"c_bool\" BOOL",
+                        "\"c_int2\" INT2",
+                        "\"c_int4\" INT4",
+                        "\"c_int8\" INT8",
+                        "\"c_float4\" FLOAT4",
+                        "\"c_float8\" FLOAT8",
+                        "\"c_numeric\" NUMERIC(38,18)",
+                        "\"c_money\" MONEY",
+                        "\"c_bytea\" BYTEA",
+                        "\"c_blob\" BLOB",
+                        "\"c_clob\" CLOB",
+                        "\"c_bit\" BIT(16)",
+                        "\"c_char\" CHARACTER(10)",
+                        "\"c_bpchar\" BPCHAR(10)",
+                        "\"c_varchar\" VARCHAR(255)",
+                        "\"c_text\" TEXT",
+                        "\"c_date\" DATE",
+                        "\"c_time\" TIME",
+                        "\"c_timestamp\" TIMESTAMP",
+                        "\"c_timestamptz\" TIMESTAMPTZ",
+                        "\"c_uuid\" UUID",
+                        "\"c_json\" JSON",
+                        "\"c_jsonb\" JSONB");
+
+        if (includePrimaryKey) {
+            columnSqls.add("CONSTRAINT pk_id_ PRIMARY KEY (\"id\")");
+        }
+
+        List<String> commentSqls =
+                Lists.newArrayList(
+                        commentSql(tablePath, "id"),
+                        commentSql(tablePath, "c_smallserial"),
+                        commentSql(tablePath, "c_serial"),
+                        commentSql(tablePath, "c_tinyint"),
+                        commentSql(tablePath, "c_bool"),
+                        commentSql(tablePath, "c_int2"),
+                        commentSql(tablePath, "c_int4"),
+                        commentSql(tablePath, "c_int8"),
+                        commentSql(tablePath, "c_float4"),
+                        commentSql(tablePath, "c_float8"),
+                        commentSql(tablePath, "c_numeric"),
+                        commentSql(tablePath, "c_money"),
+                        commentSql(tablePath, "c_bytea"),
+                        commentSql(tablePath, "c_blob"),
+                        commentSql(tablePath, "c_clob"),
+                        commentSql(tablePath, "c_bit"),
+                        commentSql(tablePath, "c_char"),
+                        commentSql(tablePath, "c_bpchar"),
+                        commentSql(tablePath, "c_varchar"),
+                        commentSql(tablePath, "c_text"),
+                        commentSql(tablePath, "c_date"),
+                        commentSql(tablePath, "c_time"),
+                        commentSql(tablePath, "c_timestamp"),
+                        commentSql(tablePath, "c_timestamptz"),
+                        commentSql(tablePath, "c_uuid"),
+                        commentSql(tablePath, "c_json"),
+                        commentSql(tablePath, "c_jsonb"));
+
+        return "CREATE TABLE "
+                + tablePath.getSchemaAndTableName("\"")
+                + " (\n"
+                + String.join(",\n", columnSqls)
+                + "\n);\n"
+                + String.join(";\n", commentSqls);
+    }
+
+    private String buildExpectedSqlFromOtherCatalog(
+            TablePath tablePath, boolean includePrimaryKey) {
+        List<String> columnSqls =
+                Lists.newArrayList(
+                        "\"id\" int8 NOT NULL",
+                        "\"c_bool\" bool NOT NULL",
+                        "\"c_int2\" int2",
+                        "\"c_int4\" int4",
+                        "\"c_int8\" int8",
+                        "\"c_float4\" float4",
+                        "\"c_float8\" float8",
+                        "\"c_numeric\" numeric(38,18)",
+                        "\"c_bytea\" bytea",
+                        "\"c_varchar\" varchar(255)",
+                        "\"c_text\" text",
+                        "\"c_date\" date",
+                        "\"c_time\" time",
+                        "\"c_timestamp\" timestamp",
+                        "\"c_timestamptz\" timestamptz");
+
+        if (includePrimaryKey) {
+            columnSqls.add("CONSTRAINT pk_id_ PRIMARY KEY (\"id\")");
+        }
+
+        List<String> commentSqls =
+                Lists.newArrayList(
+                        commentSql(tablePath, "id"),
+                        commentSql(tablePath, "c_bool"),
+                        commentSql(tablePath, "c_int2"),
+                        commentSql(tablePath, "c_int4"),
+                        commentSql(tablePath, "c_int8"),
+                        commentSql(tablePath, "c_float4"),
+                        commentSql(tablePath, "c_float8"),
+                        commentSql(tablePath, "c_numeric"),
+                        commentSql(tablePath, "c_bytea"),
+                        commentSql(tablePath, "c_varchar"),
+                        commentSql(tablePath, "c_text"),
+                        commentSql(tablePath, "c_date"),
+                        commentSql(tablePath, "c_time"),
+                        commentSql(tablePath, "c_timestamp"),
+                        commentSql(tablePath, "c_timestamptz"));
+
+        return "CREATE TABLE "
+                + tablePath.getSchemaAndTableName("\"")
+                + " (\n"
+                + String.join(",\n", columnSqls)
+                + "\n);\n"
+                + String.join(";\n", commentSqls);
+    }
+
+    private String commentSql(TablePath tablePath, String columnName) {
+        return "COMMENT ON COLUMN "
+                + tablePath.getSchemaAndTableName("\"")
+                + ".\""
+                + columnName
+                + "\" IS '"
+                + columnName
+                + "'";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/AbstractKingbaseContainerTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/AbstractKingbaseContainerTest.java
new file mode 100644
index 0000000000..c4be8aa731
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/AbstractKingbaseContainerTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase.container;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.kingbase.KingbaseCatalog;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+
+/**
+ * Base class for Kingbase Testcontainers-based unit tests. Provides shared 
Kingbase container setup
+ * and connection management.
+ *
+ * <p>NOTE: The license is baked into the image 
(liangyaobo/kingbase:v8r6-license). The license has
+ * a validity period of approximately one year. If the container fails to 
start with license-related
+ * errors, please replace the image with a newly built one that contains a 
valid license.
+ */
+@DisabledOnOs(OS.WINDOWS)
+public abstract class AbstractKingbaseContainerTest {
+
+    protected static final String KINGBASE_IMAGE = 
"liangyaobo/kingbase:v8r6-license";
+    protected static final String USERNAME = "kingbase";
+    protected static final String PASSWORD = "kingbase";
+    protected static final String DATABASE = "test";
+    protected static final String SCHEMA = "public";
+    protected static final int KINGBASE_PORT = 54321;
+
+    protected static GenericContainer<?> kingbaseContainer;
+    protected static Connection connection;
+    protected static KingbaseCatalog catalog;
+
+    @BeforeAll
+    public static void startContainer() throws SQLException {
+        DockerImageName imageName = DockerImageName.parse(KINGBASE_IMAGE);
+
+        kingbaseContainer =
+                new GenericContainer<>(imageName)
+                        .withExposedPorts(KINGBASE_PORT)
+                        .withEnv("SYSTEM_USER", USERNAME)
+                        .withEnv("SYSTEM_PWD", PASSWORD)
+                        .waitingFor(Wait.forListeningPort())
+                        .withStartupTimeout(Duration.ofMinutes(3));
+
+        kingbaseContainer.start();
+
+        String host = kingbaseContainer.getHost();
+        Integer mappedPort = kingbaseContainer.getMappedPort(KINGBASE_PORT);
+        String jdbcUrl = String.format("jdbc:kingbase8://%s:%d/%s", host, 
mappedPort, DATABASE);
+
+        connection = connectWithRetry(jdbcUrl, USERNAME, PASSWORD);
+
+        catalog =
+                new KingbaseCatalog(
+                        "kingbase",
+                        USERNAME,
+                        PASSWORD,
+                        JdbcUrlUtil.getUrlInfo(jdbcUrl),
+                        SCHEMA,
+                        null);
+        catalog.open();
+    }
+
+    @AfterAll
+    public static void stopContainer() throws SQLException {
+        if (catalog != null) {
+            catalog.close();
+        }
+        if (connection != null && !connection.isClosed()) {
+            connection.close();
+        }
+        if (kingbaseContainer != null) {
+            kingbaseContainer.stop();
+        }
+    }
+
+    protected void executeSql(String sql) throws SQLException {
+        try (Statement stmt = connection.createStatement()) {
+            stmt.execute(sql);
+        }
+    }
+
+    private static Connection connectWithRetry(String jdbcUrl, String 
username, String password)
+            throws SQLException {
+        RetryUtils.RetryMaterial retryMaterial =
+                new RetryUtils.RetryMaterial(30, true, exception -> true, 
2000);
+        try {
+            return RetryUtils.retryWithException(
+                    () -> DriverManager.getConnection(jdbcUrl, username, 
password), retryMaterial);
+        } catch (Exception e) {
+            if (e instanceof SQLException) {
+                throw (SQLException) e;
+            }
+            throw new SQLException("Failed to connect to Kingbase", e);
+        }
+    }
+
+    protected static String quoteIdentifier(String identifier) {
+        return "\"" + identifier + "\"";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseCatalogContainerTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseCatalogContainerTest.java
new file mode 100644
index 0000000000..936ddc42f4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseCatalogContainerTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase.container;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Unit tests for KingbaseCatalog using Testcontainers. Tests catalog 
operations like database
+ * listing, table operations, and schema management.
+ */
+@Slf4j
+@DisabledOnOs(OS.WINDOWS)
+public class KingbaseCatalogContainerTest extends 
AbstractKingbaseContainerTest {
+
+    @Test
+    public void testDatabaseExists() {
+        Assertions.assertTrue(catalog.databaseExists(DATABASE));
+    }
+
+    @Test
+    public void testCreateAndGetTable() throws SQLException {
+        String testTableName = "test_catalog_table";
+        TablePath tablePath = TablePath.of(DATABASE, SCHEMA, testTableName);
+
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE %s.%s (id BIGSERIAL PRIMARY KEY, name 
VARCHAR(100))",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName));
+        executeSql(createTableSql);
+
+        Assertions.assertTrue(catalog.tableExists(tablePath));
+
+        CatalogTable table = catalog.getTable(tablePath);
+        Assertions.assertNotNull(table);
+        Assertions.assertEquals(testTableName, 
table.getTableId().getTableName());
+
+        executeSql(
+                String.format(
+                        "DROP TABLE %s.%s",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName)));
+    }
+
+    @Test
+    public void testTableExists() throws SQLException {
+        String testTableName = "test_exists_table";
+        TablePath tablePath = TablePath.of(DATABASE, SCHEMA, testTableName);
+
+        Assertions.assertFalse(catalog.tableExists(tablePath));
+
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE %s.%s (id INT4)",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName));
+        executeSql(createTableSql);
+
+        Assertions.assertTrue(catalog.tableExists(tablePath));
+
+        executeSql(
+                String.format(
+                        "DROP TABLE %s.%s",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName)));
+    }
+
+    @Test
+    public void testCreateTableViaAPI() throws SQLException {
+        String testTableName = "test_api_create_table";
+        TablePath tablePath = TablePath.of(DATABASE, SCHEMA, testTableName);
+
+        TableSchema.Builder schemaBuilder = TableSchema.builder();
+        schemaBuilder.column(
+                PhysicalColumn.of(
+                        "id", BasicType.LONG_TYPE, (Long) null, false, null, 
"ID column"));
+        schemaBuilder.column(
+                PhysicalColumn.of("name", BasicType.STRING_TYPE, 100L, true, 
null, "Name column"));
+        schemaBuilder.primaryKey(PrimaryKey.of("pk_test", 
Arrays.asList("id")));
+
+        // Even with "kingbase" as catalog name, it should work because
+        // KingbaseCreateTableSqlBuilder now checks isNotBlank(sourceType)
+        // and falls back to type converter when sourceType is null
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        
org.apache.seatunnel.api.table.catalog.TableIdentifier.of(
+                                "kingbase", DATABASE, SCHEMA, testTableName),
+                        schemaBuilder.build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        "");
+
+        catalog.createTable(tablePath, catalogTable, false);
+
+        Assertions.assertTrue(catalog.tableExists(tablePath));
+
+        CatalogTable retrievedTable = catalog.getTable(tablePath);
+        Assertions.assertNotNull(retrievedTable);
+        Assertions.assertEquals(testTableName, 
retrievedTable.getTableId().getTableName());
+
+        catalog.dropTable(tablePath, false);
+        Assertions.assertFalse(catalog.tableExists(tablePath));
+    }
+
+    @Test
+    public void testDropTable() throws SQLException {
+        String testTableName = "test_drop_table";
+        TablePath tablePath = TablePath.of(DATABASE, SCHEMA, testTableName);
+
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE %s.%s (id INT4)",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName));
+        executeSql(createTableSql);
+
+        Assertions.assertTrue(catalog.tableExists(tablePath));
+
+        catalog.dropTable(tablePath, false);
+
+        Assertions.assertFalse(catalog.tableExists(tablePath));
+    }
+
+    @Test
+    public void testGetTableWithComplexTypes() throws SQLException {
+        String testTableName = "test_complex_types";
+        TablePath tablePath = TablePath.of(DATABASE, SCHEMA, testTableName);
+
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE %s.%s ("
+                                + "id BIGSERIAL PRIMARY KEY, "
+                                + "c_smallserial SMALLSERIAL, "
+                                + "c_serial SERIAL, "
+                                + "c_bool BOOL, "
+                                + "c_int2 INT2, "
+                                + "c_int4 INT4, "
+                                + "c_int8 INT8, "
+                                + "c_float4 FLOAT4, "
+                                + "c_float8 FLOAT8, "
+                                + "c_numeric NUMERIC(38,18), "
+                                + "c_char CHARACTER(10), "
+                                + "c_varchar VARCHAR(255), "
+                                + "c_text TEXT, "
+                                + "c_date DATE, "
+                                + "c_time TIME, "
+                                + "c_timestamp TIMESTAMP, "
+                                + "c_timestamptz TIMESTAMPTZ, "
+                                + "c_bytea BYTEA"
+                                + ")",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName));
+        executeSql(createTableSql);
+
+        CatalogTable table = catalog.getTable(tablePath);
+        Assertions.assertNotNull(table);
+
+        TableSchema schema = table.getTableSchema();
+        List<Column> columns = schema.getColumns();
+        Assertions.assertTrue(columns.size() >= 18, "Should have at least 18 
columns");
+
+        executeSql(
+                String.format(
+                        "DROP TABLE %s.%s",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName)));
+    }
+
+    @Test
+    public void testTableWithPrimaryKey() throws SQLException {
+        String testTableName = "test_primary_key_table";
+        TablePath tablePath = TablePath.of(DATABASE, SCHEMA, testTableName);
+
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE %s.%s (id INT8 PRIMARY KEY, name 
VARCHAR(100))",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName));
+        executeSql(createTableSql);
+
+        CatalogTable table = catalog.getTable(tablePath);
+        Assertions.assertNotNull(table);
+
+        TableSchema schema = table.getTableSchema();
+        Assertions.assertNotNull(schema.getPrimaryKey());
+        Assertions.assertEquals("id", 
schema.getPrimaryKey().getColumnNames().get(0));
+
+        executeSql(
+                String.format(
+                        "DROP TABLE %s.%s",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName)));
+    }
+
+    @Test
+    public void testCreateTableFromSource() throws SQLException {
+        String sourceTableName = "st_type_converter_source";
+        String targetTableName = "st_type_converter_target";
+        TablePath sourcePath = TablePath.of(DATABASE, SCHEMA, sourceTableName);
+        TablePath targetPath = TablePath.of(DATABASE, SCHEMA, targetTableName);
+
+        // Clean up if exists
+        if (catalog.tableExists(targetPath)) {
+            catalog.dropTable(targetPath, true);
+        }
+        if (catalog.tableExists(sourcePath)) {
+            catalog.dropTable(sourcePath, true);
+        }
+
+        // Create source table with various types
+        String createSourceSql =
+                String.format(
+                        "CREATE TABLE %s.%s ("
+                                + "id BIGSERIAL PRIMARY KEY, "
+                                + "c_int2 INT2, "
+                                + "c_int4 INT4, "
+                                + "c_int8 INT8, "
+                                + "c_float4 FLOAT4, "
+                                + "c_float8 FLOAT8, "
+                                + "c_numeric NUMERIC(38,18), "
+                                + "c_char CHARACTER(10), "
+                                + "c_varchar VARCHAR(255), "
+                                + "c_text TEXT, "
+                                + "c_date DATE, "
+                                + "c_timestamp TIMESTAMP"
+                                + ")",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(sourceTableName));
+        executeSql(createSourceSql);
+        Assertions.assertTrue(catalog.tableExists(sourcePath));
+
+        // Get source table and create target from it
+        CatalogTable sourceTable = catalog.getTable(sourcePath);
+        catalog.createTable(targetPath, sourceTable, true);
+        Assertions.assertTrue(catalog.tableExists(targetPath));
+
+        // Verify target table structure
+        CatalogTable targetTable = catalog.getTable(targetPath);
+        Assertions.assertNotNull(targetTable);
+        Assertions.assertEquals(
+                sourceTable.getTableSchema().getColumns().size(),
+                targetTable.getTableSchema().getColumns().size());
+
+        // Clean up
+        catalog.dropTable(targetPath, true);
+        catalog.dropTable(sourcePath, true);
+    }
+
+    @Test
+    public void testColumnTypePreservation() throws SQLException {
+        String testTableName = "test_column_type_preservation";
+        TablePath tablePath = TablePath.of(DATABASE, SCHEMA, testTableName);
+
+        // Create table with specific type lengths
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE %s.%s ("
+                                + "id INT8 PRIMARY KEY, "
+                                + "c_varchar VARCHAR(255), "
+                                + "c_char CHAR(10), "
+                                + "c_numeric NUMERIC(38,18)"
+                                + ")",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName));
+        executeSql(createTableSql);
+
+        CatalogTable table = catalog.getTable(tablePath);
+        Assertions.assertNotNull(table);
+
+        // Verify column types preserve full type info (VARCHAR(255), 
CHAR(10), NUMERIC(38,18))
+        List<Column> columns = table.getTableSchema().getColumns();
+        for (Column column : columns) {
+            String sourceType = column.getSourceType();
+            log.info("Column: {}, SourceType: {}", column.getName(), 
sourceType);
+            if ("c_varchar".equals(column.getName())) {
+                Assertions.assertTrue(
+                        sourceType.toLowerCase().contains("255")
+                                || 
sourceType.toLowerCase().contains("varchar"),
+                        "VARCHAR should preserve length info: " + sourceType);
+            } else if ("c_char".equals(column.getName())) {
+                Assertions.assertTrue(
+                        sourceType.toLowerCase().contains("10")
+                                || sourceType.toLowerCase().contains("char"),
+                        "CHAR should preserve length info: " + sourceType);
+            } else if ("c_numeric".equals(column.getName())) {
+                Assertions.assertTrue(
+                        sourceType.toLowerCase().contains("numeric")
+                                || sourceType.toLowerCase().contains("38"),
+                        "NUMERIC should preserve precision info: " + 
sourceType);
+            }
+        }
+
+        executeSql(
+                String.format(
+                        "DROP TABLE %s.%s",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName)));
+    }
+
+    @Test
+    public void testColumnCommentWithSingleQuote() throws SQLException {
+        String testTableName = "test_comment_escape";
+        TablePath tablePath = TablePath.of(DATABASE, SCHEMA, testTableName);
+
+        // Create source table
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE %s.%s (id INT8 PRIMARY KEY, name 
VARCHAR(100))",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName));
+        executeSql(createTableSql);
+
+        // Add comment with single quote
+        String commentSql =
+                String.format(
+                        "COMMENT ON COLUMN %s.%s.name IS 'User''s name field'",
+                        quoteIdentifier(SCHEMA), 
quoteIdentifier(testTableName));
+        executeSql(commentSql);
+
+        CatalogTable table = catalog.getTable(tablePath);
+        Assertions.assertNotNull(table);
+
+        // Verify comment is retrieved correctly
+        Column nameColumn =
+                table.getTableSchema().getColumns().stream()
+                        .filter(c -> "name".equals(c.getName()))
+                        .findFirst()
+                        .orElse(null);
+        Assertions.assertNotNull(nameColumn);
+        Assertions.assertNotNull(nameColumn.getComment());
+        log.info("Column comment: {}", nameColumn.getComment());
+
+        // Now test creating a new table from this one (tests the escape in 
SQL builder)
+        String targetTableName = "test_comment_escape_target";
+        TablePath targetPath = TablePath.of(DATABASE, SCHEMA, targetTableName);
+
+        catalog.createTable(targetPath, table, true);
+        Assertions.assertTrue(catalog.tableExists(targetPath));
+
+        // Clean up
+        catalog.dropTable(targetPath, true);
+        catalog.dropTable(tablePath, true);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseDialectContainerTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseDialectContainerTest.java
new file mode 100644
index 0000000000..29cec63763
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseDialectContainerTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase.container;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase.KingbaseDialect;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Optional;
+
+/**
+ * Unit tests for KingbaseDialect using Testcontainers. Tests dialect-specific 
functionality like
+ * quoting, SQL generation, and upsert statements.
+ */
+@DisabledOnOs(OS.WINDOWS)
+public class KingbaseDialectContainerTest extends 
AbstractKingbaseContainerTest {
+
+    private static KingbaseDialect dialect;
+    private static final String TEST_TABLE = "dialect_test_table";
+
+    @BeforeAll
+    public static void setupDialect() throws SQLException {
+        dialect = new KingbaseDialect();
+
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE %s.%s ("
+                                + "id INT8 PRIMARY KEY, "
+                                + "name VARCHAR(100), "
+                                + "value NUMERIC(10,2), "
+                                + "created_at TIMESTAMP"
+                                + ")",
+                        quoteIdentifier(SCHEMA), quoteIdentifier(TEST_TABLE));
+
+        try (Statement stmt = connection.createStatement()) {
+            stmt.execute(createTableSql);
+        }
+
+        // Insert test data
+        String insertSql =
+                String.format(
+                        "INSERT INTO %s.%s (id, name, value, created_at) "
+                                + "VALUES (1, 'test1', 100.50, 
CURRENT_TIMESTAMP)",
+                        quoteIdentifier(SCHEMA), quoteIdentifier(TEST_TABLE));
+        try (Statement stmt = connection.createStatement()) {
+            stmt.execute(insertSql);
+        }
+    }
+
+    @Test
+    public void testDialectName() {
+        Assertions.assertEquals(DatabaseIdentifier.KINGBASE, 
dialect.dialectName());
+    }
+
+    @Test
+    public void testQuoteIdentifier() {
+        // Test basic identifier
+        Assertions.assertEquals("\"table_name\"", 
dialect.quoteIdentifier("table_name"));
+        Assertions.assertEquals("\"COLUMN\"", 
dialect.quoteIdentifier("COLUMN"));
+
+        // Test identifier with dots (schema.table)
+        Assertions.assertEquals("\"schema\".\"table\"", 
dialect.quoteIdentifier("schema.table"));
+    }
+
+    @Test
+    public void testQuoteIdentifierWithFieldIde() {
+        // Test with fieldIde = UPPERCASE
+        KingbaseDialect dialectUpper = new 
KingbaseDialect(FieldIdeEnum.UPPERCASE.getValue());
+        Assertions.assertEquals("\"COLUMN_NAME\"", 
dialectUpper.quoteIdentifier("column_name"));
+
+        // Test with fieldIde = LOWERCASE
+        KingbaseDialect dialectLower = new 
KingbaseDialect(FieldIdeEnum.LOWERCASE.getValue());
+        Assertions.assertEquals("\"column_name\"", 
dialectLower.quoteIdentifier("COLUMN_NAME"));
+
+        // Test with fieldIde = ORIGINAL (default)
+        KingbaseDialect dialectOriginal = new 
KingbaseDialect(FieldIdeEnum.ORIGINAL.getValue());
+        Assertions.assertEquals("\"Column_Name\"", 
dialectOriginal.quoteIdentifier("Column_Name"));
+    }
+
+    @Test
+    public void testTableIdentifier() {
+        // Test with database and table
+        String identifier = dialect.tableIdentifier("mydb", "mytable");
+        Assertions.assertEquals("\"mydb\".\"mytable\"", identifier);
+    }
+
+    @Test
+    public void testQuoteDatabaseIdentifier() {
+        Assertions.assertEquals("\"testdb\"", 
dialect.quoteDatabaseIdentifier("testdb"));
+        Assertions.assertEquals("\"MyDatabase\"", 
dialect.quoteDatabaseIdentifier("MyDatabase"));
+    }
+
+    @Test
+    public void testParseTablePath() {
+        // Test parsing full table path
+        TablePath path1 = dialect.parse("database.schema.table");
+        Assertions.assertEquals("database", path1.getDatabaseName());
+        Assertions.assertEquals("schema", path1.getSchemaName());
+        Assertions.assertEquals("table", path1.getTableName());
+
+        // Test parsing simple table name
+        TablePath path2 = dialect.parse("table");
+        Assertions.assertNull(path2.getDatabaseName());
+        Assertions.assertEquals("table", path2.getTableName());
+    }
+
+    @Test
+    public void testGetUpsertStatement() {
+        String[] fieldNames = {"id", "name", "value", "created_at"};
+        String[] uniqueKeyFields = {"id"};
+
+        Optional<String> upsertSqlOptional =
+                dialect.getUpsertStatement(SCHEMA, TEST_TABLE, fieldNames, 
uniqueKeyFields);
+
+        Assertions.assertTrue(upsertSqlOptional.isPresent());
+        String upsertSql = upsertSqlOptional.get();
+
+        // Verify the SQL contains expected parts
+        Assertions.assertTrue(upsertSql.contains("INSERT INTO"));
+        Assertions.assertTrue(upsertSql.contains("ON CONFLICT"));
+        Assertions.assertTrue(upsertSql.contains("DO UPDATE SET"));
+        Assertions.assertTrue(upsertSql.contains("EXCLUDED"));
+    }
+
+    @Test
+    public void testGetInsertIntoStatement() {
+        String[] fieldNames = {"id", "name", "value"};
+
+        String insertSql = dialect.getInsertIntoStatement(SCHEMA, TEST_TABLE, 
fieldNames);
+
+        Assertions.assertNotNull(insertSql);
+        Assertions.assertTrue(insertSql.contains("INSERT INTO"));
+        Assertions.assertTrue(insertSql.contains("\"id\""));
+        Assertions.assertTrue(insertSql.contains("\"name\""));
+        Assertions.assertTrue(insertSql.contains("\"value\""));
+    }
+
+    @Test
+    public void testGetUpdateStatement() {
+        String[] fieldNames = {"name", "value"};
+        String[] conditionFields = {"id"};
+
+        String updateSql =
+                dialect.getUpdateStatement(SCHEMA, TEST_TABLE, fieldNames, 
conditionFields, false);
+
+        Assertions.assertNotNull(updateSql);
+        Assertions.assertTrue(updateSql.contains("UPDATE"));
+        Assertions.assertTrue(updateSql.contains("SET"));
+        Assertions.assertTrue(updateSql.contains("WHERE"));
+    }
+
+    @Test
+    public void testGetDeleteStatement() {
+        String[] conditionFields = {"id"};
+
+        String deleteSql = dialect.getDeleteStatement(SCHEMA, TEST_TABLE, 
conditionFields);
+
+        Assertions.assertNotNull(deleteSql);
+        Assertions.assertTrue(deleteSql.contains("DELETE FROM"));
+        Assertions.assertTrue(deleteSql.contains("WHERE"));
+    }
+
+    @Test
+    public void testGetRowExistsStatement() {
+        String[] conditionFields = {"id"};
+
+        String existsSql = dialect.getRowExistsStatement(SCHEMA, TEST_TABLE, 
conditionFields);
+
+        Assertions.assertNotNull(existsSql);
+        Assertions.assertTrue(existsSql.contains("SELECT 1 FROM"));
+        Assertions.assertTrue(existsSql.contains("WHERE"));
+    }
+
+    @Test
+    public void testRealUpsertExecution() throws SQLException {
+        String testTable = "test_upsert_execution";
+
+        try {
+            // Create test table
+            String createTableSql =
+                    String.format(
+                            "CREATE TABLE %s.%s ("
+                                    + "id INT8 PRIMARY KEY, "
+                                    + "name VARCHAR(100), "
+                                    + "value INT4"
+                                    + ")",
+                            quoteIdentifier(SCHEMA), 
quoteIdentifier(testTable));
+            executeSql(createTableSql);
+
+            // Insert first row
+            String insertSql =
+                    String.format(
+                            "INSERT INTO %s.%s (id, name, value) VALUES (1, 
'first', 100)",
+                            quoteIdentifier(SCHEMA), 
quoteIdentifier(testTable));
+            executeSql(insertSql);
+
+            // Verify insert
+            try (Statement stmt = connection.createStatement();
+                    ResultSet rs =
+                            stmt.executeQuery(
+                                    String.format(
+                                            "SELECT COUNT(*) FROM %s.%s",
+                                            quoteIdentifier(SCHEMA), 
quoteIdentifier(testTable)))) {
+                rs.next();
+                Assertions.assertEquals(1, rs.getInt(1));
+            }
+
+            // Generate upsert SQL
+            String[] fieldNames = {"id", "name", "value"};
+            String[] uniqueKeyFields = {"id"};
+            Optional<String> upsertSqlOptional =
+                    dialect.getUpsertStatement(SCHEMA, testTable, fieldNames, 
uniqueKeyFields);
+
+            Assertions.assertTrue(upsertSqlOptional.isPresent());
+            String upsertSql = upsertSqlOptional.get();
+
+            // Verify the generated SQL structure
+            Assertions.assertTrue(upsertSql.contains("INSERT INTO"));
+            Assertions.assertTrue(upsertSql.contains("ON CONFLICT"));
+            Assertions.assertTrue(upsertSql.contains("DO UPDATE SET"));
+
+        } finally {
+            // Cleanup
+            try {
+                executeSql(
+                        String.format(
+                                "DROP TABLE IF EXISTS %s.%s",
+                                quoteIdentifier(SCHEMA), 
quoteIdentifier(testTable)));
+            } catch (SQLException e) {
+                // Ignore cleanup errors
+            }
+        }
+    }
+
+    @Test
+    public void testGetRowConverter() {
+        Assertions.assertNotNull(dialect.getRowConverter());
+        Assertions.assertEquals(
+                "KingbaseJdbcRowConverter", 
dialect.getRowConverter().getClass().getSimpleName());
+    }
+
+    @Test
+    public void testGetJdbcDialectTypeMapper() {
+        Assertions.assertNotNull(dialect.getJdbcDialectTypeMapper());
+        Assertions.assertEquals(
+                "KingbaseTypeMapper",
+                dialect.getJdbcDialectTypeMapper().getClass().getSimpleName());
+    }
+
+    @Test
+    public void testFieldIdeHandling() {
+        // Test with ORIGINAL (default)
+        String original = dialect.getFieldIde("ColumnName", 
FieldIdeEnum.ORIGINAL.getValue());
+        Assertions.assertEquals("ColumnName", original);
+
+        // Test with UPPERCASE
+        String upper = dialect.getFieldIde("ColumnName", 
FieldIdeEnum.UPPERCASE.getValue());
+        Assertions.assertEquals("COLUMNNAME", upper);
+
+        // Test with LOWERCASE
+        String lower = dialect.getFieldIde("ColumnName", 
FieldIdeEnum.LOWERCASE.getValue());
+        Assertions.assertEquals("columnname", lower);
+    }
+
+    @Test
+    public void testCreatPreparedStatement() throws SQLException {
+        PreparedStatement ps = null;
+        try {
+            String sql =
+                    String.format(
+                            "SELECT * FROM %s.%s",
+                            quoteIdentifier(SCHEMA), 
quoteIdentifier(TEST_TABLE));
+            ps = dialect.creatPreparedStatement(connection, sql, 100);
+
+            Assertions.assertNotNull(ps);
+            Assertions.assertEquals(100, ps.getFetchSize());
+        } finally {
+            if (ps != null) {
+                ps.close();
+            }
+        }
+    }
+
+    @Test
+    public void testTableIdentifierWithTablePath() {
+        TablePath tablePath = TablePath.of(DATABASE, SCHEMA, TEST_TABLE);
+        String identifier = dialect.tableIdentifier(tablePath);
+
+        Assertions.assertTrue(identifier.contains(SCHEMA));
+        Assertions.assertTrue(identifier.contains(TEST_TABLE));
+    }
+}

Reply via email to