This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch apache_240124_fix_jdbc_sink in repository https://gitbox.apache.org/repos/asf/seatunnel.git
commit 7769f64bc33ba0e1f2ef58c1aa7ae28b9d851c82 Author: Eric <gaojun2...@gmail.com> AuthorDate: Wed Jan 24 11:58:50 2024 +0800 Fix Jdbc sink target table name error --- .../connectors/seatunnel/jdbc/sink/JdbcSink.java | 8 +- .../seatunnel/jdbc/sink/JdbcSinkFactory.java | 188 ++++++++++----------- 2 files changed, 98 insertions(+), 98 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index af95537bd2..0616871811 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -185,10 +185,10 @@ public class JdbcSink : fieldIdeEnumEnum.getValue(); TablePath tablePath = TablePath.of( - jdbcSinkConfig.getDatabase() - + "." - + CatalogUtils.quoteTableIdentifier( - jdbcSinkConfig.getTable(), fieldIde)); + catalogTable.getTableId().getDatabaseName(), + catalogTable.getTableId().getSchemaName(), + CatalogUtils.quoteTableIdentifier( + catalogTable.getTableId().getTableName(), fieldIde)); catalogTable.getOptions().put("fieldIde", fieldIde); return Optional.of( new DefaultSaveModeHandler( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index 4fe88e669f..21970fdc37 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -102,112 +102,112 @@ public class JdbcSinkFactory implements TableSinkFactory { Optional<String> queryOptional = config.getOptional(QUERY); if (!optionalTable.isPresent() && !queryOptional.isPresent()) { optionalTable = Optional.of(REPLACE_TABLE_NAME_KEY); - // get source table relevant information - TableIdentifier tableId = catalogTable.getTableId(); - String sourceDatabaseName = tableId.getDatabaseName(); - String sourceSchemaName = tableId.getSchemaName(); - String sourceTableName = tableId.getTableName(); - // get sink table relevant information - String sinkDatabaseName = optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY); - String sinkTableNameBefore = optionalTable.get(); - String[] sinkTableSplitArray = sinkTableNameBefore.split("\\."); - String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length - 1]; - String sinkSchemaName; - if (sinkTableSplitArray.length > 1) { - sinkSchemaName = sinkTableSplitArray[sinkTableSplitArray.length - 2]; - } else { - sinkSchemaName = null; - } - if (StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) { - sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA); - } - // to add tablePrefix and tableSuffix - String tempTableName; - String prefix = catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX); - String suffix = catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX); - if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) { - tempTableName = - StringUtils.isNotEmpty(prefix) ? prefix + sinkTableName : sinkTableName; - tempTableName = - StringUtils.isNotEmpty(suffix) ? tempTableName + suffix : tempTableName; + } + // get source table relevant information + TableIdentifier tableId = catalogTable.getTableId(); + String sourceDatabaseName = tableId.getDatabaseName(); + String sourceSchemaName = tableId.getSchemaName(); + String sourceTableName = tableId.getTableName(); + // get sink table relevant information + String sinkDatabaseName = optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY); + String sinkTableNameBefore = optionalTable.get(); + String[] sinkTableSplitArray = sinkTableNameBefore.split("\\."); + String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length - 1]; + String sinkSchemaName; + if (sinkTableSplitArray.length > 1) { + sinkSchemaName = sinkTableSplitArray[sinkTableSplitArray.length - 2]; + } else { + sinkSchemaName = null; + } + if (StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) { + sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA); + } + // to add tablePrefix and tableSuffix + String tempTableName; + String prefix = catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX); + String suffix = catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX); + if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) { + tempTableName = StringUtils.isNotEmpty(prefix) ? prefix + sinkTableName : sinkTableName; + tempTableName = StringUtils.isNotEmpty(suffix) ? tempTableName + suffix : tempTableName; - } else { - tempTableName = sinkTableName; - } - // to replace - String finalDatabaseName = sinkDatabaseName; - if (StringUtils.isNotEmpty(sourceDatabaseName)) { - finalDatabaseName = - sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, sourceDatabaseName); - } + } else { + tempTableName = sinkTableName; + } + // to replace + String finalDatabaseName = sinkDatabaseName; + if (StringUtils.isNotEmpty(sourceDatabaseName)) { + finalDatabaseName = + sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, sourceDatabaseName); + } - String finalSchemaName; - if (sinkSchemaName != null) { - if (sourceSchemaName == null) { - finalSchemaName = sinkSchemaName; - } else { - finalSchemaName = - sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY, sourceSchemaName); - } + String finalSchemaName; + if (sinkSchemaName != null) { + if (sourceSchemaName == null) { + finalSchemaName = sinkSchemaName; } else { - finalSchemaName = null; - } - String finalTableName = sinkTableName; - if (StringUtils.isNotEmpty(sourceTableName)) { - finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY, sourceTableName); + finalSchemaName = sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY, sourceSchemaName); } + } else { + finalSchemaName = null; + } + String finalTableName = sinkTableName; + if (StringUtils.isNotEmpty(sourceTableName)) { + finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY, sourceTableName); + } - // rebuild TableIdentifier and catalogTable - TableIdentifier newTableId = - TableIdentifier.of( - tableId.getCatalogName(), - finalDatabaseName, - finalSchemaName, - finalTableName); - catalogTable = - CatalogTable.of( - newTableId, - catalogTable.getTableSchema(), - catalogTable.getOptions(), - catalogTable.getPartitionKeys(), - catalogTable.getComment(), - catalogTable.getCatalogName()); - Map<String, String> map = config.toMap(); - if (catalogTable.getTableId().getSchemaName() != null) { + // rebuild TableIdentifier and catalogTable + TableIdentifier newTableId = + TableIdentifier.of( + tableId.getCatalogName(), + finalDatabaseName, + finalSchemaName, + finalTableName); + catalogTable = + CatalogTable.of( + newTableId, + catalogTable.getTableSchema(), + catalogTable.getOptions(), + catalogTable.getPartitionKeys(), + catalogTable.getComment(), + catalogTable.getCatalogName()); + Map<String, String> map = config.toMap(); + if (catalogTable.getTableId().getSchemaName() != null) { + map.put( + TABLE.key(), + catalogTable.getTableId().getSchemaName() + + "." + + catalogTable.getTableId().getTableName()); + } else { + map.put(TABLE.key(), catalogTable.getTableId().getTableName()); + } + map.put(DATABASE.key(), catalogTable.getTableId().getDatabaseName()); + PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey(); + if (primaryKey != null && !CollectionUtils.isEmpty(primaryKey.getColumnNames())) { + map.put(PRIMARY_KEYS.key(), String.join(",", primaryKey.getColumnNames())); + } else { + Optional<ConstraintKey> keyOptional = + catalogTable.getTableSchema().getConstraintKeys().stream() + .filter( + key -> + ConstraintKey.ConstraintType.UNIQUE_KEY.equals( + key.getConstraintType())) + .findFirst(); + if (keyOptional.isPresent()) { map.put( - TABLE.key(), - catalogTable.getTableId().getSchemaName() - + "." - + catalogTable.getTableId().getTableName()); - } else { - map.put(TABLE.key(), catalogTable.getTableId().getTableName()); - } - map.put(DATABASE.key(), catalogTable.getTableId().getDatabaseName()); - PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey(); - if (primaryKey != null && !CollectionUtils.isEmpty(primaryKey.getColumnNames())) { - map.put(PRIMARY_KEYS.key(), String.join(",", primaryKey.getColumnNames())); - } else { - Optional<ConstraintKey> keyOptional = - catalogTable.getTableSchema().getConstraintKeys().stream() - .filter( - key -> - ConstraintKey.ConstraintType.UNIQUE_KEY.equals( - key.getConstraintType())) - .findFirst(); - if (keyOptional.isPresent()) { - map.put( - PRIMARY_KEYS.key(), - keyOptional.get().getColumnNames().stream() - .map(key -> key.getColumnName()) - .collect(Collectors.joining(","))); - } + PRIMARY_KEYS.key(), + keyOptional.get().getColumnNames().stream() + .map(key -> key.getColumnName()) + .collect(Collectors.joining(","))); } - config = ReadonlyConfig.fromMap(new HashMap<>(map)); } + config = ReadonlyConfig.fromMap(new HashMap<>(map)); // always execute final ReadonlyConfig options = config; JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config); FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE); + catalogTable + .getOptions() + .put("fieldIde", fieldIdeEnum == null ? null : fieldIdeEnum.getValue()); JdbcDialect dialect = JdbcDialectLoader.load( sinkConfig.getJdbcConnectionConfig().getUrl(),