This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new eb54fdd52c [Fix][Paimon] nullable and comment attribute was lost during automatic table creation (#9020) eb54fdd52c is described below commit eb54fdd52c82a146ad2ae1741cff541fd39ed29c Author: zhangdonghao <39961809+hawk9...@users.noreply.github.com> AuthorDate: Mon Mar 24 10:59:11 2025 +0800 [Fix][Paimon] nullable and comment attribute was lost during automatic table creation (#9020) --- .../jdbc/catalog/AbstractJdbcCatalog.java | 18 ++- .../seatunnel/jdbc/catalog/utils/CatalogUtils.java | 15 +++ .../jdbc/catalog/utils/CatalogUtilsTest.java | 2 +- .../seatunnel/paimon/catalog/PaimonCatalog.java | 19 ++- .../AlterPaimonTableSchemaEventHandler.java | 18 ++- .../seatunnel/paimon/utils/RowTypeConverter.java | 8 +- .../seatunnel/paimon/utils/SchemaUtil.java | 13 +- .../paimon/catalog/PaimonWithCommentTest.java | 140 +++++++++++++++++++++ .../paimon/utils/RowTypeConverterTest.java | 24 +++- .../e2e/connector/paimon/AbstractPaimonIT.java | 5 +- .../e2e/connector/paimon/PaimonSinkCDCIT.java | 2 - .../paimon/PaimonSinkWithSchemaEvolutionIT.java | 2 + 12 files changed, 244 insertions(+), 22 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java index a1d82cbe2f..bf94fc811b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java @@ -211,6 +211,7 @@ public abstract class AbstractJdbcCatalog implements Catalog { Connection conn = getConnection(dbUrl); try { DatabaseMetaData metaData = conn.getMetaData(); + Optional<String> comment = getTableComment(metaData, tablePath); Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData, tablePath); List<ConstraintKey> constraintKeys = getConstraintKeys(metaData, tablePath); TableSchema.Builder tableSchemaBuilder = @@ -225,7 +226,7 @@ public abstract class AbstractJdbcCatalog implements Catalog { tableSchemaBuilder.build(), buildConnectorOptions(tablePath), Collections.emptyList(), - "", + comment.orElse(""), catalogName); } catch (SeaTunnelRuntimeException e) { @@ -283,6 +284,21 @@ public abstract class AbstractJdbcCatalog implements Catalog { return CatalogUtils.getPrimaryKey(metaData, TablePath.of(database, schema, table)); } + protected Optional<String> getTableComment(DatabaseMetaData metaData, TablePath tablePath) + throws SQLException { + return getTableComment( + metaData, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); + } + + protected Optional<String> getTableComment( + DatabaseMetaData metaData, String database, String schema, String table) + throws SQLException { + return CatalogUtils.getTableComment(metaData, TablePath.of(database, schema, table)); + } + protected List<ConstraintKey> getConstraintKeys(DatabaseMetaData metaData, TablePath tablePath) throws SQLException { return getConstraintKeys( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java index 070ef670af..d6f3c4c975 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java @@ -101,6 +101,21 @@ public class CatalogUtils { return getFieldIde(identifier, fieldIde); } + public static Optional<String> getTableComment(DatabaseMetaData metaData, TablePath tablePath) + throws SQLException { + try (ResultSet rs = + metaData.getTables( + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName(), + new String[] {"TABLE"})) { + if (rs.next()) { + return Optional.ofNullable(rs.getString("REMARKS")); + } + } + return Optional.empty(); + } + public static Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData, TablePath tablePath) throws SQLException { // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys, diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java index 7fb8741f05..f4e132e734 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java @@ -52,7 +52,7 @@ public class CatalogUtilsTest { } @Test - void testGetCommentWithJdbcDialectTypeMapper() throws SQLException { + void testGetTableCommentWithJdbcDialectTypeMapper() throws SQLException { TableSchema tableSchema = CatalogUtils.getTableSchema( new TestDatabaseMetaData(), diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java index a14a250cf2..00d162a237 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -163,7 +163,9 @@ public class PaimonCatalog implements Catalog, PaimonTable { try { Schema paimonSchema = SchemaUtil.toPaimonSchema( - table.getTableSchema(), new PaimonSinkConfig(readonlyConfig)); + table.getTableSchema(), + new PaimonSinkConfig(readonlyConfig), + table.getComment()); catalog.createTable(toIdentifier(tablePath), paimonSchema, ignoreIfExists); } catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException e) { throw new TableAlreadyExistException(this.catalogName, tablePath); @@ -276,7 +278,7 @@ public class PaimonCatalog implements Catalog, PaimonTable { builder.build(), paimonFileStoreTableTable.options(), partitionKeys, - null, + paimonFileStoreTableTable.comment().orElse(null), catalogName); } @@ -343,4 +345,17 @@ public class PaimonCatalog implements Catalog, PaimonTable { throw new CatalogException("ColumnNotExistException: {}", e); } } + + public void alterTable( + Identifier identifier, List<SchemaChange> schemaChanges, boolean ignoreIfNotExists) { + try { + catalog.alterTable(identifier, schemaChanges, true); + } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) { + throw new CatalogException("TableNotExistException: {}", e); + } catch (org.apache.paimon.catalog.Catalog.ColumnAlreadyExistException e) { + throw new CatalogException("ColumnAlreadyExistException: {}", e); + } catch (org.apache.paimon.catalog.Catalog.ColumnNotExistException e) { + throw new CatalogException("ColumnNotExistException: {}", e); + } + } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java index 1872641c1e..687103474a 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/schema/handler/AlterPaimonTableSchemaEventHandler.java @@ -41,6 +41,9 @@ import org.apache.paimon.utils.Preconditions; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.List; + import static org.apache.seatunnel.connectors.seatunnel.paimon.sink.schema.UpdatedDataFields.canConvert; @Slf4j @@ -95,13 +98,16 @@ public class AlterPaimonTableSchemaEventHandler { ? null : SchemaChange.Move.after(column.getName(), afterColumnName); BasicTypeDefine<DataType> reconvertColumn = PaimonTypeMapper.INSTANCE.reconvert(column); - SchemaChange schemaChange = + DataType nativeType = reconvertColumn.getNativeType(); + List<SchemaChange> schemaChanges = new ArrayList<>(); + schemaChanges.add( SchemaChange.addColumn( - column.getName(), - reconvertColumn.getNativeType(), - column.getComment(), - move); - paimonCatalog.alterTable(identifier, schemaChange, false); + column.getName(), nativeType.copy(true), column.getComment(), move)); + if (!nativeType.isNullable()) { + schemaChanges.add( + SchemaChange.updateColumnType(column.getName(), nativeType.copy(false))); + } + paimonCatalog.alterTable(identifier, schemaChanges, false); } else if (event instanceof AlterTableDropColumnEvent) { String columnName = ((AlterTableDropColumnEvent) event).getColumn(); paimonCatalog.alterTable(identifier, SchemaChange.dropColumn(columnName), true); diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java index ca5a87726f..20dedb7e9a 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverter.java @@ -284,7 +284,7 @@ public class RowTypeConverter { int timestampScale = Objects.isNull(scale) ? TimestampType.DEFAULT_PRECISION : scale; TimestampType timestampType = DataTypes.TIMESTAMP(timestampScale); - builder.nativeType(timestampType); + builder.nativeType(timestampType.copy(column.isNullable())); builder.dataType(timestampType.getTypeRoot().name()); builder.columnType(timestampType.toString()); builder.scale(timestampScale); @@ -293,7 +293,7 @@ public class RowTypeConverter { case TIME: int timeScale = Objects.isNull(scale) ? TimeType.DEFAULT_PRECISION : scale; TimeType timeType = DataTypes.TIME(timeScale); - builder.nativeType(timeType); + builder.nativeType(timeType.copy(column.isNullable())); builder.columnType(timeType.toString()); builder.dataType(timeType.getTypeRoot().name()); builder.scale(timeScale); @@ -356,7 +356,7 @@ public class RowTypeConverter { } DecimalType paimonDecimalType = DataTypes.DECIMAL(precision, scale); - builder.nativeType(paimonDecimalType); + builder.nativeType(paimonDecimalType.copy(column.isNullable())); builder.columnType(paimonDecimalType.toString()); builder.dataType(paimonDecimalType.getTypeRoot().name()); builder.scale(scale); @@ -364,7 +364,7 @@ public class RowTypeConverter { builder.length(column.getColumnLength()); return builder.build(); default: - builder.nativeType(visit(column.getName(), dataType)); + builder.nativeType(visit(column.getName(), dataType).copy(column.isNullable())); builder.columnType(dataType.toString()); builder.length(column.getColumnLength()); builder.dataType(dataType.getSqlType().name()); diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java index ca825a269f..84f936a584 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnecto import org.apache.paimon.CoreOptions; import org.apache.paimon.schema.Schema; +import org.apache.paimon.shade.org.apache.commons.lang.StringUtils; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -44,11 +45,16 @@ public class SchemaUtil { } public static Schema toPaimonSchema( - TableSchema tableSchema, PaimonSinkConfig paimonSinkConfig) { + TableSchema tableSchema, PaimonSinkConfig paimonSinkConfig, String comment) { Schema.Builder paiSchemaBuilder = Schema.newBuilder(); for (int i = 0; i < tableSchema.getColumns().size(); i++) { Column column = tableSchema.getColumns().get(i); - paiSchemaBuilder.column(column.getName(), toPaimonType(column)); + if (StringUtils.isNotBlank(column.getComment())) { + paiSchemaBuilder.column( + column.getName(), toPaimonType(column), column.getComment()); + } else { + paiSchemaBuilder.column(column.getName(), toPaimonType(column)); + } } List<String> primaryKeys = paimonSinkConfig.getPrimaryKeys(); if (primaryKeys.isEmpty() && Objects.nonNull(tableSchema.getPrimaryKey())) { @@ -69,6 +75,9 @@ public class SchemaUtil { if (!writeProps.isEmpty()) { paiSchemaBuilder.options(writeProps); } + if (StringUtils.isNotBlank(comment)) { + paiSchemaBuilder.comment(comment); + } return paiSchemaBuilder.build(); } diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonWithCommentTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonWithCommentTest.java new file mode 100644 index 0000000000..e84f15258b --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonWithCommentTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; +import org.apache.paimon.table.FileStoreTable; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@Slf4j +public class PaimonWithCommentTest { + + private PaimonCatalog paimonCatalog; + private TableSchema.Builder schemaBuilder; + private final String CATALOG_NAME = "paimon_catalog"; + private final String DATABASE_NAME = "default"; + private final String TABLE_NAME = "test_with_comment"; + private final String warehousePath = "/tmp/paimon"; + private Catalog catalog; + + @BeforeEach + public void before() { + Map<String, Object> properties = new HashMap<>(); + properties.put("warehouse", warehousePath); + properties.put("plugin_name", "Paimon"); + properties.put("database", DATABASE_NAME); + properties.put("table", TABLE_NAME); + Map<String, String> writeProps = new HashMap<>(); + writeProps.put("bucket", "1"); + properties.put("paimon.table.write-props", writeProps); + ReadonlyConfig config = ReadonlyConfig.fromMap(properties); + CatalogContext catalogContext = CatalogContext.create(new Path(warehousePath)); + catalog = CatalogFactory.createCatalog(catalogContext); + paimonCatalog = new PaimonCatalog(CATALOG_NAME, config); + paimonCatalog.open(); + paimonCatalog.createDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME), true); + this.schemaBuilder = + TableSchema.builder() + .column( + PhysicalColumn.of( + "c_string", + BasicType.STRING_TYPE, + (Long) null, + true, + null, + "c_string")) + .column( + PhysicalColumn.of( + "c_int", + BasicType.INT_TYPE, + (Long) null, + false, + null, + "c_int")) + .column( + PhysicalColumn.of( + "c_bigint", + BasicType.LONG_TYPE, + (Long) null, + false, + null, + "c_bigint")); + } + + @Test + public void testCreateTableWithCommentAndNullable() throws Catalog.TableNotExistException { + TableSchema tableSchema = + schemaBuilder + .primaryKey(PrimaryKey.of("pk", Collections.singletonList("c_int"))) + .build(); + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of(CATALOG_NAME, DATABASE_NAME, TABLE_NAME), + tableSchema, + new HashMap<>(), + new ArrayList<>(), + "test table"); + paimonCatalog.createTable( + TablePath.of(DATABASE_NAME, null, TABLE_NAME), catalogTable, true); + + FileStoreTable table = + (FileStoreTable) catalog.getTable(Identifier.create(DATABASE_NAME, TABLE_NAME)); + Assertions.assertEquals("test table", table.comment().get()); + table.schema() + .fields() + .forEach( + field -> { + Assertions.assertEquals(field.name(), field.description()); + if (field.name().equals("c_string")) { + Assertions.assertTrue(field.type().isNullable()); + } else { + Assertions.assertFalse(field.type().isNullable()); + } + }); + } + + @AfterEach + public void after() { + paimonCatalog.dropDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME), false); + paimonCatalog.close(); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java index fe1214929b..0d29320768 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java +++ b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowTypeConverterTest.java @@ -54,6 +54,8 @@ public class RowTypeConverterTest { private Column column; + private Column columnNotNull; + private TableSchema tableSchema; public static final RowType DEFAULT_ROW_TYPE = @@ -187,15 +189,27 @@ public class RowTypeConverterTest { column = PhysicalColumn.builder() - .name("c_decimal") + .name("c_decimal_null") .sourceType(DataTypes.DECIMAL(30, 8).toString()) - .nullable(false) + .nullable(true) .dataType(dataType) .columnLength(30L) .defaultValue(3.0) .scale(8) .comment("c_decimal_type_define") .build(); + + columnNotNull = + PhysicalColumn.builder() + .name("c_decimal_not_null") + .sourceType(DataTypes.DECIMAL(30, 8).toString()) + .nullable(false) + .dataType(dataType) + .columnLength(30L) + .defaultValue(3.0) + .scale(8) + .comment("c_decimal_not_null") + .build(); } @Test @@ -227,6 +241,12 @@ public class RowTypeConverterTest { public void seaTunnelColumnToPaimonDataType() { BasicTypeDefine<DataType> dataTypeDefine = RowTypeConverter.reconvert(column); isEquals(column, dataTypeDefine); + Assertions.assertTrue(dataTypeDefine.isNullable()); + Assertions.assertTrue(dataTypeDefine.getNativeType().isNullable()); + BasicTypeDefine<DataType> dataTypeDefineNotNull = RowTypeConverter.reconvert(columnNotNull); + isEquals(columnNotNull, dataTypeDefineNotNull); + Assertions.assertFalse(dataTypeDefineNotNull.isNullable()); + Assertions.assertFalse(dataTypeDefineNotNull.getNativeType().isNullable()); } private void isEquals(Column column, BasicTypeDefine<DataType> dataTypeDefine) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java index 98323e9df1..d5dd8608a5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/AbstractPaimonIT.java @@ -47,8 +47,9 @@ public abstract class AbstractPaimonIT extends TestSuiteBase { protected static final String FAKE_DATABASE1 = "FakeDatabase1"; protected static final String FAKE_TABLE2 = "FakeTable1"; protected static final String FAKE_DATABASE2 = "FakeDatabase2"; - protected String CATALOG_ROOT_DIR_WIN = "C:/Users/"; - protected String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/"; + private final String CATALOG_ROOT_DIR_WIN = + "C:/Users/" + System.getProperty("user.name") + "/tmp/"; + private final String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/"; protected boolean isWindows; protected boolean changeLogEnabled = false; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java index cb947fba9e..9e2af9bd4e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -69,8 +69,6 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT implements TestResource { public void startUp() throws Exception { this.isWindows = System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS"); - CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN + System.getProperty("user.name") + "/tmp/"; - CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/"; } @AfterAll diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java index d82ef45278..5c85b29863 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkWithSchemaEvolutionIT.java @@ -134,6 +134,8 @@ public class PaimonSinkWithSchemaEvolutionIT extends AbstractPaimonIT implements @BeforeAll @Override public void startUp() throws Exception { + this.isWindows = + System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS"); log.info("The second stage: Starting Mysql containers..."); Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join(); log.info("Mysql Containers are started");