This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch apache_240124_fix_jdbc_sink
in repository https://gitbox.apache.org/repos/asf/seatunnel.git

commit 7769f64bc33ba0e1f2ef58c1aa7ae28b9d851c82
Author: Eric <gaojun2...@gmail.com>
AuthorDate: Wed Jan 24 11:58:50 2024 +0800

    Fix Jdbc sink target table name error
---
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |   8 +-
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       | 188 ++++++++++-----------
 2 files changed, 98 insertions(+), 98 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index af95537bd2..0616871811 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -185,10 +185,10 @@ public class JdbcSink
                                     : fieldIdeEnumEnum.getValue();
                     TablePath tablePath =
                             TablePath.of(
-                                    jdbcSinkConfig.getDatabase()
-                                            + "."
-                                            + 
CatalogUtils.quoteTableIdentifier(
-                                                    jdbcSinkConfig.getTable(), 
fieldIde));
+                                    
catalogTable.getTableId().getDatabaseName(),
+                                    catalogTable.getTableId().getSchemaName(),
+                                    CatalogUtils.quoteTableIdentifier(
+                                            
catalogTable.getTableId().getTableName(), fieldIde));
                     catalogTable.getOptions().put("fieldIde", fieldIde);
                     return Optional.of(
                             new DefaultSaveModeHandler(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 4fe88e669f..21970fdc37 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -102,112 +102,112 @@ public class JdbcSinkFactory implements 
TableSinkFactory {
         Optional<String> queryOptional = config.getOptional(QUERY);
         if (!optionalTable.isPresent() && !queryOptional.isPresent()) {
             optionalTable = Optional.of(REPLACE_TABLE_NAME_KEY);
-            // get source table relevant information
-            TableIdentifier tableId = catalogTable.getTableId();
-            String sourceDatabaseName = tableId.getDatabaseName();
-            String sourceSchemaName = tableId.getSchemaName();
-            String sourceTableName = tableId.getTableName();
-            // get sink table relevant information
-            String sinkDatabaseName = 
optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY);
-            String sinkTableNameBefore = optionalTable.get();
-            String[] sinkTableSplitArray = sinkTableNameBefore.split("\\.");
-            String sinkTableName = 
sinkTableSplitArray[sinkTableSplitArray.length - 1];
-            String sinkSchemaName;
-            if (sinkTableSplitArray.length > 1) {
-                sinkSchemaName = 
sinkTableSplitArray[sinkTableSplitArray.length - 2];
-            } else {
-                sinkSchemaName = null;
-            }
-            if 
(StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) {
-                sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA);
-            }
-            // to add tablePrefix and tableSuffix
-            String tempTableName;
-            String prefix = 
catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX);
-            String suffix = 
catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX);
-            if (StringUtils.isNotEmpty(prefix) || 
StringUtils.isNotEmpty(suffix)) {
-                tempTableName =
-                        StringUtils.isNotEmpty(prefix) ? prefix + 
sinkTableName : sinkTableName;
-                tempTableName =
-                        StringUtils.isNotEmpty(suffix) ? tempTableName + 
suffix : tempTableName;
+        }
+        // get source table relevant information
+        TableIdentifier tableId = catalogTable.getTableId();
+        String sourceDatabaseName = tableId.getDatabaseName();
+        String sourceSchemaName = tableId.getSchemaName();
+        String sourceTableName = tableId.getTableName();
+        // get sink table relevant information
+        String sinkDatabaseName = 
optionalDatabase.orElse(REPLACE_DATABASE_NAME_KEY);
+        String sinkTableNameBefore = optionalTable.get();
+        String[] sinkTableSplitArray = sinkTableNameBefore.split("\\.");
+        String sinkTableName = sinkTableSplitArray[sinkTableSplitArray.length 
- 1];
+        String sinkSchemaName;
+        if (sinkTableSplitArray.length > 1) {
+            sinkSchemaName = sinkTableSplitArray[sinkTableSplitArray.length - 
2];
+        } else {
+            sinkSchemaName = null;
+        }
+        if 
(StringUtils.isNotBlank(catalogOptions.get(JdbcCatalogOptions.SCHEMA))) {
+            sinkSchemaName = catalogOptions.get(JdbcCatalogOptions.SCHEMA);
+        }
+        // to add tablePrefix and tableSuffix
+        String tempTableName;
+        String prefix = catalogOptions.get(JdbcCatalogOptions.TABLE_PREFIX);
+        String suffix = catalogOptions.get(JdbcCatalogOptions.TABLE_SUFFIX);
+        if (StringUtils.isNotEmpty(prefix) || StringUtils.isNotEmpty(suffix)) {
+            tempTableName = StringUtils.isNotEmpty(prefix) ? prefix + 
sinkTableName : sinkTableName;
+            tempTableName = StringUtils.isNotEmpty(suffix) ? tempTableName + 
suffix : tempTableName;
 
-            } else {
-                tempTableName = sinkTableName;
-            }
-            // to replace
-            String finalDatabaseName = sinkDatabaseName;
-            if (StringUtils.isNotEmpty(sourceDatabaseName)) {
-                finalDatabaseName =
-                        sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, 
sourceDatabaseName);
-            }
+        } else {
+            tempTableName = sinkTableName;
+        }
+        // to replace
+        String finalDatabaseName = sinkDatabaseName;
+        if (StringUtils.isNotEmpty(sourceDatabaseName)) {
+            finalDatabaseName =
+                    sinkDatabaseName.replace(REPLACE_DATABASE_NAME_KEY, 
sourceDatabaseName);
+        }
 
-            String finalSchemaName;
-            if (sinkSchemaName != null) {
-                if (sourceSchemaName == null) {
-                    finalSchemaName = sinkSchemaName;
-                } else {
-                    finalSchemaName =
-                            sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY, 
sourceSchemaName);
-                }
+        String finalSchemaName;
+        if (sinkSchemaName != null) {
+            if (sourceSchemaName == null) {
+                finalSchemaName = sinkSchemaName;
             } else {
-                finalSchemaName = null;
-            }
-            String finalTableName = sinkTableName;
-            if (StringUtils.isNotEmpty(sourceTableName)) {
-                finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY, 
sourceTableName);
+                finalSchemaName = 
sinkSchemaName.replace(REPLACE_SCHEMA_NAME_KEY, sourceSchemaName);
             }
+        } else {
+            finalSchemaName = null;
+        }
+        String finalTableName = sinkTableName;
+        if (StringUtils.isNotEmpty(sourceTableName)) {
+            finalTableName = tempTableName.replace(REPLACE_TABLE_NAME_KEY, 
sourceTableName);
+        }
 
-            // rebuild TableIdentifier and catalogTable
-            TableIdentifier newTableId =
-                    TableIdentifier.of(
-                            tableId.getCatalogName(),
-                            finalDatabaseName,
-                            finalSchemaName,
-                            finalTableName);
-            catalogTable =
-                    CatalogTable.of(
-                            newTableId,
-                            catalogTable.getTableSchema(),
-                            catalogTable.getOptions(),
-                            catalogTable.getPartitionKeys(),
-                            catalogTable.getComment(),
-                            catalogTable.getCatalogName());
-            Map<String, String> map = config.toMap();
-            if (catalogTable.getTableId().getSchemaName() != null) {
+        // rebuild TableIdentifier and catalogTable
+        TableIdentifier newTableId =
+                TableIdentifier.of(
+                        tableId.getCatalogName(),
+                        finalDatabaseName,
+                        finalSchemaName,
+                        finalTableName);
+        catalogTable =
+                CatalogTable.of(
+                        newTableId,
+                        catalogTable.getTableSchema(),
+                        catalogTable.getOptions(),
+                        catalogTable.getPartitionKeys(),
+                        catalogTable.getComment(),
+                        catalogTable.getCatalogName());
+        Map<String, String> map = config.toMap();
+        if (catalogTable.getTableId().getSchemaName() != null) {
+            map.put(
+                    TABLE.key(),
+                    catalogTable.getTableId().getSchemaName()
+                            + "."
+                            + catalogTable.getTableId().getTableName());
+        } else {
+            map.put(TABLE.key(), catalogTable.getTableId().getTableName());
+        }
+        map.put(DATABASE.key(), catalogTable.getTableId().getDatabaseName());
+        PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+        if (primaryKey != null && 
!CollectionUtils.isEmpty(primaryKey.getColumnNames())) {
+            map.put(PRIMARY_KEYS.key(), String.join(",", 
primaryKey.getColumnNames()));
+        } else {
+            Optional<ConstraintKey> keyOptional =
+                    catalogTable.getTableSchema().getConstraintKeys().stream()
+                            .filter(
+                                    key ->
+                                            
ConstraintKey.ConstraintType.UNIQUE_KEY.equals(
+                                                    key.getConstraintType()))
+                            .findFirst();
+            if (keyOptional.isPresent()) {
                 map.put(
-                        TABLE.key(),
-                        catalogTable.getTableId().getSchemaName()
-                                + "."
-                                + catalogTable.getTableId().getTableName());
-            } else {
-                map.put(TABLE.key(), catalogTable.getTableId().getTableName());
-            }
-            map.put(DATABASE.key(), 
catalogTable.getTableId().getDatabaseName());
-            PrimaryKey primaryKey = 
catalogTable.getTableSchema().getPrimaryKey();
-            if (primaryKey != null && 
!CollectionUtils.isEmpty(primaryKey.getColumnNames())) {
-                map.put(PRIMARY_KEYS.key(), String.join(",", 
primaryKey.getColumnNames()));
-            } else {
-                Optional<ConstraintKey> keyOptional =
-                        
catalogTable.getTableSchema().getConstraintKeys().stream()
-                                .filter(
-                                        key ->
-                                                
ConstraintKey.ConstraintType.UNIQUE_KEY.equals(
-                                                        
key.getConstraintType()))
-                                .findFirst();
-                if (keyOptional.isPresent()) {
-                    map.put(
-                            PRIMARY_KEYS.key(),
-                            keyOptional.get().getColumnNames().stream()
-                                    .map(key -> key.getColumnName())
-                                    .collect(Collectors.joining(",")));
-                }
+                        PRIMARY_KEYS.key(),
+                        keyOptional.get().getColumnNames().stream()
+                                .map(key -> key.getColumnName())
+                                .collect(Collectors.joining(",")));
             }
-            config = ReadonlyConfig.fromMap(new HashMap<>(map));
         }
+        config = ReadonlyConfig.fromMap(new HashMap<>(map));
         // always execute
         final ReadonlyConfig options = config;
         JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
         FieldIdeEnum fieldIdeEnum = config.get(JdbcOptions.FIELD_IDE);
+        catalogTable
+                .getOptions()
+                .put("fieldIde", fieldIdeEnum == null ? null : 
fieldIdeEnum.getValue());
         JdbcDialect dialect =
                 JdbcDialectLoader.load(
                         sinkConfig.getJdbcConnectionConfig().getUrl(),

Reply via email to