This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new eb54fdd52c [Fix][Paimon] nullable and comment attribute was lost 
during automatic table creation (#9020)
eb54fdd52c is described below

commit eb54fdd52c82a146ad2ae1741cff541fd39ed29c
Author: zhangdonghao <39961809+hawk9...@users.noreply.github.com>
AuthorDate: Mon Mar 24 10:59:11 2025 +0800

    [Fix][Paimon] nullable and comment attribute was lost during automatic 
table creation (#9020)
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          |  18 ++-
 .../seatunnel/jdbc/catalog/utils/CatalogUtils.java |  15 +++
 .../jdbc/catalog/utils/CatalogUtilsTest.java       |   2 +-
 .../seatunnel/paimon/catalog/PaimonCatalog.java    |  19 ++-
 .../AlterPaimonTableSchemaEventHandler.java        |  18 ++-
 .../seatunnel/paimon/utils/RowTypeConverter.java   |   8 +-
 .../seatunnel/paimon/utils/SchemaUtil.java         |  13 +-
 .../paimon/catalog/PaimonWithCommentTest.java      | 140 +++++++++++++++++++++
 .../paimon/utils/RowTypeConverterTest.java         |  24 +++-
 .../e2e/connector/paimon/AbstractPaimonIT.java     |   5 +-
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      |   2 -
 .../paimon/PaimonSinkWithSchemaEvolutionIT.java    |   2 +
 12 files changed, 244 insertions(+), 22 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index a1d82cbe2f..bf94fc811b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -211,6 +211,7 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         Connection conn = getConnection(dbUrl);
         try {
             DatabaseMetaData metaData = conn.getMetaData();
+            Optional<String> comment = getTableComment(metaData, tablePath);
             Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData, 
tablePath);
             List<ConstraintKey> constraintKeys = getConstraintKeys(metaData, 
tablePath);
             TableSchema.Builder tableSchemaBuilder =
@@ -225,7 +226,7 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
                     tableSchemaBuilder.build(),
                     buildConnectorOptions(tablePath),
                     Collections.emptyList(),
-                    "",
+                    comment.orElse(""),
                     catalogName);
 
         } catch (SeaTunnelRuntimeException e) {
@@ -283,6 +284,21 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         return CatalogUtils.getPrimaryKey(metaData, TablePath.of(database, 
schema, table));
     }
 
+    protected Optional<String> getTableComment(DatabaseMetaData metaData, 
TablePath tablePath)
+            throws SQLException {
+        return getTableComment(
+                metaData,
+                tablePath.getDatabaseName(),
+                tablePath.getSchemaName(),
+                tablePath.getTableName());
+    }
+
+    protected Optional<String> getTableComment(
+            DatabaseMetaData metaData, String database, String schema, String 
table)
+            throws SQLException {
+        return CatalogUtils.getTableComment(metaData, TablePath.of(database, 
schema, table));
+    }
+
     protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, 
TablePath tablePath)
             throws SQLException {
         return getConstraintKeys(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
index 070ef670af..d6f3c4c975 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
@@ -101,6 +101,21 @@ public class CatalogUtils {
         return getFieldIde(identifier, fieldIde);
     }
 
+    public static Optional<String> getTableComment(DatabaseMetaData metaData, 
TablePath tablePath)
+            throws SQLException {
+        try (ResultSet rs =
+                metaData.getTables(
+                        tablePath.getDatabaseName(),
+                        tablePath.getSchemaName(),
+                        tablePath.getTableName(),
+                        new String[] {"TABLE"})) {
+            if (rs.next()) {
+                return Optional.ofNullable(rs.getString("REMARKS"));
+            }
+        }
+        return Optional.empty();
+    }
+
     public static Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData 
metaData, TablePath tablePath)
             throws SQLException {
         // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
index 7fb8741f05..f4e132e734 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java
@@ -52,7 +52,7 @@ public class CatalogUtilsTest {
     }
 
     @Test
-    void testGetCommentWithJdbcDialectTypeMapper() throws SQLException {
+    void testGetTableCommentWithJdbcDialectTypeMapper() throws SQLException {
         TableSchema tableSchema =
                 CatalogUtils.getTableSchema(
                         new TestDatabaseMetaData(),
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index a14a250cf2..00d162a237 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -163,7 +163,9 @@ public class PaimonCatalog implements Catalog, PaimonTable {
         try {
             Schema paimonSchema =
                     SchemaUtil.toPaimonSchema(
-                            table.getTableSchema(), new 
PaimonSinkConfig(readonlyConfig));
+                            table.getTableSchema(),
+                            new PaimonSinkConfig(readonlyConfig),
+                            table.getComment());
             catalog.createTable(toIdentifier(tablePath), paimonSchema, 
ignoreIfExists);
         } catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException 
e) {
             throw new TableAlreadyExistException(this.catalogName, tablePath);
@@ -276,7 +278,7 @@ public class PaimonCatalog implements Catalog, PaimonTable {
                 builder.build(),
                 paimonFileStoreTableTable.options(),
                 partitionKeys,
-                null,
+                paimonFileStoreTableTable.comment().orElse(null),
                 catalogName);
     }
 
@@ -343,4 +345,17 @@ public class PaimonCatalog implements Catalog, PaimonTable 
{
             throw new CatalogException("ColumnNotExistException: {}", e);
         }
     }
+
+    public void alterTable(
+            Identifier identifier, List<SchemaChange> schemaChanges, boolean 
ignoreIfNotExists) {
+        try {
+            catalog.alterTable(identifier, schemaChanges, true);
+        } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+            throw new CatalogException("TableNotExistException: {}", e);
+        } catch (org.apache.paimon.catalog.Catalog.ColumnAlreadyExistException 
e) {
+            throw new CatalogException("ColumnAlreadyExistException: {}", e);
+        } catch (org.apache.paimon.catalog.Catalog.ColumnNotExistException e) {
+            throw new CatalogException("ColumnNotExistException: {}", e);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
index 1872641c1e..687103474a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java
@@ -41,6 +41,9 @@ import org.apache.paimon.utils.Preconditions;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import static 
org.apache.seatunnel.connectors.seatunnel.paimon.sink.schema.UpdatedDataFields.canConvert;
 
 @Slf4j
@@ -95,13 +98,16 @@ public class AlterPaimonTableSchemaEventHandler {
                             ? null
                             : SchemaChange.Move.after(column.getName(), 
afterColumnName);
             BasicTypeDefine<DataType> reconvertColumn = 
PaimonTypeMapper.INSTANCE.reconvert(column);
-            SchemaChange schemaChange =
+            DataType nativeType = reconvertColumn.getNativeType();
+            List<SchemaChange> schemaChanges = new ArrayList<>();
+            schemaChanges.add(
                     SchemaChange.addColumn(
-                            column.getName(),
-                            reconvertColumn.getNativeType(),
-                            column.getComment(),
-                            move);
-            paimonCatalog.alterTable(identifier, schemaChange, false);
+                            column.getName(), nativeType.copy(true), 
column.getComment(), move));
+            if (!nativeType.isNullable()) {
+                schemaChanges.add(
+                        SchemaChange.updateColumnType(column.getName(), 
nativeType.copy(false)));
+            }
+            paimonCatalog.alterTable(identifier, schemaChanges, false);
         } else if (event instanceof AlterTableDropColumnEvent) {
             String columnName = ((AlterTableDropColumnEvent) 
event).getColumn();
             paimonCatalog.alterTable(identifier, 
SchemaChange.dropColumn(columnName), true);
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
index ca5a87726f..20dedb7e9a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java
@@ -284,7 +284,7 @@ public class RowTypeConverter {
                     int timestampScale =
                             Objects.isNull(scale) ? 
TimestampType.DEFAULT_PRECISION : scale;
                     TimestampType timestampType = 
DataTypes.TIMESTAMP(timestampScale);
-                    builder.nativeType(timestampType);
+                    
builder.nativeType(timestampType.copy(column.isNullable()));
                     builder.dataType(timestampType.getTypeRoot().name());
                     builder.columnType(timestampType.toString());
                     builder.scale(timestampScale);
@@ -293,7 +293,7 @@ public class RowTypeConverter {
                 case TIME:
                     int timeScale = Objects.isNull(scale) ? 
TimeType.DEFAULT_PRECISION : scale;
                     TimeType timeType = DataTypes.TIME(timeScale);
-                    builder.nativeType(timeType);
+                    builder.nativeType(timeType.copy(column.isNullable()));
                     builder.columnType(timeType.toString());
                     builder.dataType(timeType.getTypeRoot().name());
                     builder.scale(timeScale);
@@ -356,7 +356,7 @@ public class RowTypeConverter {
                     }
 
                     DecimalType paimonDecimalType = 
DataTypes.DECIMAL(precision, scale);
-                    builder.nativeType(paimonDecimalType);
+                    
builder.nativeType(paimonDecimalType.copy(column.isNullable()));
                     builder.columnType(paimonDecimalType.toString());
                     builder.dataType(paimonDecimalType.getTypeRoot().name());
                     builder.scale(scale);
@@ -364,7 +364,7 @@ public class RowTypeConverter {
                     builder.length(column.getColumnLength());
                     return builder.build();
                 default:
-                    builder.nativeType(visit(column.getName(), dataType));
+                    builder.nativeType(visit(column.getName(), 
dataType).copy(column.isNullable()));
                     builder.columnType(dataType.toString());
                     builder.length(column.getColumnLength());
                     builder.dataType(dataType.getSqlType().name());
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
index ca825a269f..84f936a584 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnecto
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.shade.org.apache.commons.lang.StringUtils;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 
@@ -44,11 +45,16 @@ public class SchemaUtil {
     }
 
     public static Schema toPaimonSchema(
-            TableSchema tableSchema, PaimonSinkConfig paimonSinkConfig) {
+            TableSchema tableSchema, PaimonSinkConfig paimonSinkConfig, String 
comment) {
         Schema.Builder paiSchemaBuilder = Schema.newBuilder();
         for (int i = 0; i < tableSchema.getColumns().size(); i++) {
             Column column = tableSchema.getColumns().get(i);
-            paiSchemaBuilder.column(column.getName(), toPaimonType(column));
+            if (StringUtils.isNotBlank(column.getComment())) {
+                paiSchemaBuilder.column(
+                        column.getName(), toPaimonType(column), 
column.getComment());
+            } else {
+                paiSchemaBuilder.column(column.getName(), 
toPaimonType(column));
+            }
         }
         List<String> primaryKeys = paimonSinkConfig.getPrimaryKeys();
         if (primaryKeys.isEmpty() && 
Objects.nonNull(tableSchema.getPrimaryKey())) {
@@ -69,6 +75,9 @@ public class SchemaUtil {
         if (!writeProps.isEmpty()) {
             paiSchemaBuilder.options(writeProps);
         }
+        if (StringUtils.isNotBlank(comment)) {
+            paiSchemaBuilder.comment(comment);
+        }
         return paiSchemaBuilder.build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonWithCommentTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonWithCommentTest.java
new file mode 100644
index 0000000000..e84f15258b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonWithCommentTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class PaimonWithCommentTest {
+
+    private PaimonCatalog paimonCatalog;
+    private TableSchema.Builder schemaBuilder;
+    private final String CATALOG_NAME = "paimon_catalog";
+    private final String DATABASE_NAME = "default";
+    private final String TABLE_NAME = "test_with_comment";
+    private final String warehousePath = "/tmp/paimon";
+    private Catalog catalog;
+
+    @BeforeEach
+    public void before() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("warehouse", warehousePath);
+        properties.put("plugin_name", "Paimon");
+        properties.put("database", DATABASE_NAME);
+        properties.put("table", TABLE_NAME);
+        Map<String, String> writeProps = new HashMap<>();
+        writeProps.put("bucket", "1");
+        properties.put("paimon.table.write-props", writeProps);
+        ReadonlyConfig config = ReadonlyConfig.fromMap(properties);
+        CatalogContext catalogContext = CatalogContext.create(new 
Path(warehousePath));
+        catalog = CatalogFactory.createCatalog(catalogContext);
+        paimonCatalog = new PaimonCatalog(CATALOG_NAME, config);
+        paimonCatalog.open();
+        paimonCatalog.createDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME), 
true);
+        this.schemaBuilder =
+                TableSchema.builder()
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_string",
+                                        BasicType.STRING_TYPE,
+                                        (Long) null,
+                                        true,
+                                        null,
+                                        "c_string"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_int",
+                                        BasicType.INT_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_int"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_bigint",
+                                        BasicType.LONG_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_bigint"));
+    }
+
+    @Test
+    public void testCreateTableWithCommentAndNullable() throws 
Catalog.TableNotExistException {
+        TableSchema tableSchema =
+                schemaBuilder
+                        .primaryKey(PrimaryKey.of("pk", 
Collections.singletonList("c_int")))
+                        .build();
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of(CATALOG_NAME, DATABASE_NAME, 
TABLE_NAME),
+                        tableSchema,
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "test table");
+        paimonCatalog.createTable(
+                TablePath.of(DATABASE_NAME, null, TABLE_NAME), catalogTable, 
true);
+
+        FileStoreTable table =
+                (FileStoreTable) 
catalog.getTable(Identifier.create(DATABASE_NAME, TABLE_NAME));
+        Assertions.assertEquals("test table", table.comment().get());
+        table.schema()
+                .fields()
+                .forEach(
+                        field -> {
+                            Assertions.assertEquals(field.name(), 
field.description());
+                            if (field.name().equals("c_string")) {
+                                
Assertions.assertTrue(field.type().isNullable());
+                            } else {
+                                
Assertions.assertFalse(field.type().isNullable());
+                            }
+                        });
+    }
+
+    @AfterEach
+    public void after() {
+        paimonCatalog.dropDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME), 
false);
+        paimonCatalog.close();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
index fe1214929b..0d29320768 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java
@@ -54,6 +54,8 @@ public class RowTypeConverterTest {
 
     private Column column;
 
+    private Column columnNotNull;
+
     private TableSchema tableSchema;
 
     public static final RowType DEFAULT_ROW_TYPE =
@@ -187,15 +189,27 @@ public class RowTypeConverterTest {
 
         column =
                 PhysicalColumn.builder()
-                        .name("c_decimal")
+                        .name("c_decimal_null")
                         .sourceType(DataTypes.DECIMAL(30, 8).toString())
-                        .nullable(false)
+                        .nullable(true)
                         .dataType(dataType)
                         .columnLength(30L)
                         .defaultValue(3.0)
                         .scale(8)
                         .comment("c_decimal_type_define")
                         .build();
+
+        columnNotNull =
+                PhysicalColumn.builder()
+                        .name("c_decimal_not_null")
+                        .sourceType(DataTypes.DECIMAL(30, 8).toString())
+                        .nullable(false)
+                        .dataType(dataType)
+                        .columnLength(30L)
+                        .defaultValue(3.0)
+                        .scale(8)
+                        .comment("c_decimal_not_null")
+                        .build();
     }
 
     @Test
@@ -227,6 +241,12 @@ public class RowTypeConverterTest {
     public void seaTunnelColumnToPaimonDataType() {
         BasicTypeDefine<DataType> dataTypeDefine = 
RowTypeConverter.reconvert(column);
         isEquals(column, dataTypeDefine);
+        Assertions.assertTrue(dataTypeDefine.isNullable());
+        Assertions.assertTrue(dataTypeDefine.getNativeType().isNullable());
+        BasicTypeDefine<DataType> dataTypeDefineNotNull = 
RowTypeConverter.reconvert(columnNotNull);
+        isEquals(columnNotNull, dataTypeDefineNotNull);
+        Assertions.assertFalse(dataTypeDefineNotNull.isNullable());
+        
Assertions.assertFalse(dataTypeDefineNotNull.getNativeType().isNullable());
     }
 
     private void isEquals(Column column, BasicTypeDefine<DataType> 
dataTypeDefine) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
index 98323e9df1..d5dd8608a5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java
@@ -47,8 +47,9 @@ public abstract class AbstractPaimonIT extends TestSuiteBase {
     protected static final String FAKE_DATABASE1 = "FakeDatabase1";
     protected static final String FAKE_TABLE2 = "FakeTable1";
     protected static final String FAKE_DATABASE2 = "FakeDatabase2";
-    protected String CATALOG_ROOT_DIR_WIN = "C:/Users/";
-    protected String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
+    private final String CATALOG_ROOT_DIR_WIN =
+            "C:/Users/" + System.getProperty("user.name") + "/tmp/";
+    private final String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + 
"/";
     protected boolean isWindows;
     protected boolean changeLogEnabled = false;
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index cb947fba9e..9e2af9bd4e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -69,8 +69,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
     public void startUp() throws Exception {
         this.isWindows =
                 
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
-        CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN + 
System.getProperty("user.name") + "/tmp/";
-        CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
     }
 
     @AfterAll
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
index d82ef45278..5c85b29863 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java
@@ -134,6 +134,8 @@ public class PaimonSinkWithSchemaEvolutionIT extends 
AbstractPaimonIT implements
     @BeforeAll
     @Override
     public void startUp() throws Exception {
+        this.isWindows =
+                
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
         log.info("The second stage: Starting Mysql containers...");
         Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
         log.info("Mysql Containers are started");

Reply via email to