This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new da9e563facc Remove TaskConfiguration reference from 
PrepareTargetSchemasParameter and PrepareTargetTablesParameter (#19455)
da9e563facc is described below

commit da9e563faccf2344ed250ca64a3caea783c95ebb
Author: Da Xiang Huang <[email protected]>
AuthorDate: Wed Jul 27 17:11:25 2022 +0800

    Remove TaskConfiguration reference from PrepareTargetSchemasParameter and 
PrepareTargetTablesParameter (#19455)
---
 .../datasource/AbstractDataSourcePreparer.java     | 35 ++++++++-------------
 .../datasource/PrepareTargetSchemasParameter.java  | 12 ++++++--
 .../datasource/PrepareTargetTablesParameter.java   | 26 ++++++----------
 .../rulealtered/RuleAlteredJobPreparer.java        | 36 +++++++++++++++-------
 .../datasource/MySQLDataSourcePreparer.java        |  4 +--
 .../datasource/MySQLDataSourcePreparerTest.java    |  2 +-
 .../datasource/OpenGaussDataSourcePreparer.java    |  4 +--
 .../datasource/PostgreSQLDataSourcePreparer.java   |  4 +--
 8 files changed, 65 insertions(+), 58 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index abf908c50ac..a5bde33ac74 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,25 +17,22 @@
 
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
-import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
 
 /**
  * Abstract data source preparer.
@@ -51,17 +48,11 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
     
     @Override
     public void prepareTargetSchemas(final PrepareTargetSchemasParameter 
parameter) {
-        DatabaseType sourceDatabaseType = 
DatabaseTypeFactory.getInstance(parameter.getTaskConfig().getJobConfig().getSourceDatabaseType());
-        DatabaseType targetDatabaseType = 
DatabaseTypeFactory.getInstance(parameter.getTaskConfig().getJobConfig().getTargetDatabaseType());
-        if (!sourceDatabaseType.isSchemaAvailable() || 
!targetDatabaseType.isSchemaAvailable()) {
-            log.info("prepareTargetSchemas, one of source or target database 
type schema is not available, ignore");
-            return;
-        }
         Set<String> schemaNames = getSchemaNames(parameter);
-        String defaultSchema = 
DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType, 
parameter.getTaskConfig().getJobConfig().getDatabaseName());
+        String defaultSchema = 
DatabaseTypeEngine.getDefaultSchemaName(parameter.getTargetDatabaseType(), 
parameter.getDatabaseName());
         log.info("prepareTargetSchemas, schemaNames={}, defaultSchema={}", 
schemaNames, defaultSchema);
-        PipelineSQLBuilder pipelineSQLBuilder = 
PipelineSQLBuilderFactory.getInstance(targetDatabaseType.getType());
-        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getTaskConfig(), 
parameter.getDataSourceManager()).getConnection()) {
+        PipelineSQLBuilder pipelineSQLBuilder = 
PipelineSQLBuilderFactory.getInstance(parameter.getTargetDatabaseType().getType());
+        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getDataSourceConfig(), 
parameter.getDataSourceManager()).getConnection()) {
             for (String each : schemaNames) {
                 if (each.equalsIgnoreCase(defaultSchema)) {
                     continue;
@@ -80,7 +71,7 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
     
     private Set<String> getSchemaNames(final PrepareTargetSchemasParameter 
parameter) {
         Set<String> result = new HashSet<>();
-        for (String each : 
parameter.getTaskConfig().getJobConfig().splitLogicTableNames()) {
+        for (String each : parameter.getLogicTableNames()) {
             String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(each);
             if (null == schemaName) {
                 throw new PipelineJobPrepareFailedException("Can not get 
schemaName by logic table name " + each);
@@ -95,8 +86,8 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
         return 
dataSourceManager.getDataSource(PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
 jobConfig.getSource().getParameter()));
     }
     
-    protected final PipelineDataSourceWrapper getTargetCachedDataSource(final 
TaskConfiguration taskConfig, final PipelineDataSourceManager 
dataSourceManager) {
-        return 
dataSourceManager.getDataSource(taskConfig.getImporterConfig().getDataSourceConfig());
+    protected final PipelineDataSourceWrapper getTargetCachedDataSource(final 
PipelineDataSourceConfiguration dataSourceConfig, final 
PipelineDataSourceManager dataSourceManager) {
+        return dataSourceManager.getDataSource(dataSourceConfig);
     }
     
     protected final void executeTargetTableSQL(final Connection 
targetConnection, final String sql) throws SQLException {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
index b19cd0afd7b..5c4a4b382bc 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
@@ -17,11 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
+import java.util.List;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
 /**
  * Prepare target schemas parameter.
@@ -30,7 +32,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 @Getter
 public final class PrepareTargetSchemasParameter {
     
-    private final TaskConfiguration taskConfig;
+    private final List<String> logicTableNames;
+    
+    private final DatabaseType targetDatabaseType;
+    
+    private final String databaseName;
+    
+    private final PipelineDataSourceConfiguration dataSourceConfig;
     
     private final PipelineDataSourceManager dataSourceManager;
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index d49cba24444..cf399464508 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -20,9 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 import lombok.Getter;
 import lombok.NonNull;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 
 /**
@@ -31,28 +30,23 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 @Getter
 public final class PrepareTargetTablesParameter {
     
-    private final TaskConfiguration taskConfig;
+    private final String databaseName;
     
     private final JobDataNodeLine tablesFirstDataNodes;
     
+    private final PipelineDataSourceConfiguration dataSourceConfig;
+    
     private final PipelineDataSourceManager dataSourceManager;
     
     private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
-    public PrepareTargetTablesParameter(@NonNull final TaskConfiguration 
taskConfig, @NonNull final PipelineDataSourceManager dataSourceManager,
-                                        final TableNameSchemaNameMapping 
tableNameSchemaNameMapping) {
-        this.taskConfig = taskConfig;
-        tablesFirstDataNodes = 
JobDataNodeLine.unmarshal(taskConfig.getJobConfig().getTablesFirstDataNodes());
+    public PrepareTargetTablesParameter(@NonNull final String databaseName, 
@NonNull final PipelineDataSourceConfiguration dataSourceConfig,
+                                        @NonNull final 
PipelineDataSourceManager dataSourceManager,
+                                        @NonNull final String 
tablesFirstDataNodes, final TableNameSchemaNameMapping 
tableNameSchemaNameMapping) {
+        this.databaseName = databaseName;
+        this.dataSourceConfig = dataSourceConfig;
+        this.tablesFirstDataNodes = 
JobDataNodeLine.unmarshal(tablesFirstDataNodes);
         this.dataSourceManager = dataSourceManager;
         this.tableNameSchemaNameMapping = tableNameSchemaNameMapping;
     }
-    
-    /**
-     * Get job configuration.
-     *
-     * @return job configuration
-     */
-    public RuleAlteredJobConfiguration getJobConfig() {
-        return taskConfig.getJobConfig();
-    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 6456409cb22..288d1cf0b85 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -17,6 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
@@ -55,14 +62,6 @@ import org.apache.shardingsphere.infra.lock.LockScope;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
 
-import javax.sql.DataSource;
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Rule altered job preparer.
  */
@@ -143,11 +142,26 @@ public final class RuleAlteredJobPreparer {
             return;
         }
         TableNameSchemaNameMapping tableNameSchemaNameMapping = 
jobContext.getTaskConfig().getDumperConfig().getTableNameSchemaNameMapping();
-        PrepareTargetSchemasParameter prepareTargetSchemasParameter = new 
PrepareTargetSchemasParameter(jobContext.getTaskConfig(), 
jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
-        
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
-        PrepareTargetTablesParameter prepareTargetTablesParameter = new 
PrepareTargetTablesParameter(jobContext.getTaskConfig(), 
jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
+        PrepareTargetSchemasParameter prepareTargetSchemasParameter = new 
PrepareTargetSchemasParameter(jobConfig.splitLogicTableNames(),
+                
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()),
+                jobConfig.getDatabaseName(), 
jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(), 
jobContext.getDataSourceManager(), tableNameSchemaNameMapping);
+        if (isSourceAndTargetSchemaAvailable(jobConfig)) {
+            
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
+        }
+        PrepareTargetTablesParameter prepareTargetTablesParameter = new 
PrepareTargetTablesParameter(jobConfig.getDatabaseName(), 
jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig(),
+                jobContext.getDataSourceManager(), 
jobConfig.getTablesFirstDataNodes(), tableNameSchemaNameMapping);
         
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
     }
+
+    private boolean isSourceAndTargetSchemaAvailable(final 
RuleAlteredJobConfiguration jobConfig) {
+        DatabaseType sourceDatabaseType = 
DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType());
+        DatabaseType targetDatabaseType = 
DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType());
+        if (!sourceDatabaseType.isSchemaAvailable() || 
!targetDatabaseType.isSchemaAvailable()) {
+            log.info("prepareTargetSchemas, one of source or target database 
type schema is not available, ignore");
+            return false;
+        }
+        return true;
+    }
     
     private void checkSourceDataSource(final RuleAlteredJobContext jobContext) 
{
         DataSourceChecker dataSourceChecker = 
DataSourceCheckerFactory.getInstance(jobContext.getJobConfig().getSourceDatabaseType());
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 54d531fdd26..5c282fbfe0d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -41,7 +41,7 @@ public final class MySQLDataSourcePreparer extends 
AbstractDataSourcePreparer {
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter 
parameter) {
         PipelineDataSourceManager dataSourceManager = 
parameter.getDataSourceManager();
-        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getTaskConfig(), 
dataSourceManager).getConnection()) {
+        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getDataSourceConfig(), 
dataSourceManager).getConnection()) {
             for (String each : getCreateTableSQL(parameter)) {
                 executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(each));
                 log.info("create target table '{}' success", each);
@@ -56,7 +56,7 @@ public final class MySQLDataSourcePreparer extends 
AbstractDataSourcePreparer {
         List<String> result = new LinkedList<>();
         for (JobDataNodeEntry each : 
parameter.getTablesFirstDataNodes().getEntries()) {
             String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            result.add(generator.generateLogicDDLSQL(new MySQLDatabaseType(), 
parameter.getJobConfig().getDatabaseName(), schemaName, 
each.getLogicTableName()));
+            result.add(generator.generateLogicDDLSQL(new MySQLDatabaseType(), 
parameter.getDatabaseName(), schemaName, each.getLogicTableName()));
         }
         return result;
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 140528e89bd..98c6d100a99 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -85,7 +85,7 @@ public final class MySQLDataSourcePreparerTest {
         when(jobConfig.getTarget()).thenReturn(targetPipelineDataSourceConfig);
         when(jobConfig.getTarget().getType()).thenReturn("ShardingSphereJDBC");
         when(jobConfig.getTarget().getParameter()).thenReturn("target");
-        
when(prepareTargetTablesParameter.getJobConfig()).thenReturn(jobConfig);
+        
when(prepareTargetTablesParameter.getDatabaseName()).thenReturn("test_db");
         
when(prepareTargetTablesParameter.getTablesFirstDataNodes()).thenReturn(new 
JobDataNodeLine(Collections.emptyList()));
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index 32f635aba91..5ba5ff93801 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -43,7 +43,7 @@ public final class OpenGaussDataSourcePreparer extends 
AbstractDataSourcePrepare
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter 
parameter) {
         List<String> createLogicalTableSQLs = 
listCreateLogicalTableSQL(parameter);
-        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getTaskConfig(), 
parameter.getDataSourceManager()).getConnection()) {
+        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getDataSourceConfig(), 
parameter.getDataSourceManager()).getConnection()) {
             for (String createLogicalTableSQL : createLogicalTableSQLs) {
                 for (String each : 
Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList()))
 {
                     executeTargetTableSQL(targetConnection, 
addIfNotExistsForCreateTableSQL(each));
@@ -59,7 +59,7 @@ public final class OpenGaussDataSourcePreparer extends 
AbstractDataSourcePrepare
         List<String> result = new LinkedList<>();
         for (JobDataNodeEntry each : 
parameter.getTablesFirstDataNodes().getEntries()) {
             String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            result.add(generator.generateLogicDDLSQL(new 
OpenGaussDatabaseType(), parameter.getJobConfig().getDatabaseName(), 
schemaName, each.getLogicTableName()));
+            result.add(generator.generateLogicDDLSQL(new 
OpenGaussDatabaseType(), parameter.getDatabaseName(), schemaName, 
each.getLogicTableName()));
         }
         return result;
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource
 [...]
index d579ca04585..bc5f72469f6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -43,7 +43,7 @@ public final class PostgreSQLDataSourcePreparer extends 
AbstractDataSourcePrepar
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter 
parameter) {
         List<String> createLogicalTableSQLs = 
listCreateLogicalTableSQL(parameter);
-        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getTaskConfig(), 
parameter.getDataSourceManager()).getConnection()) {
+        try (Connection targetConnection = 
getTargetCachedDataSource(parameter.getDataSourceConfig(), 
parameter.getDataSourceManager()).getConnection()) {
             for (String createLogicalTableSQL : createLogicalTableSQLs) {
                 for (String each : 
Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList()))
 {
                     executeTargetTableSQL(targetConnection, each);
@@ -59,7 +59,7 @@ public final class PostgreSQLDataSourcePreparer extends 
AbstractDataSourcePrepar
         List<String> result = new LinkedList<>();
         for (JobDataNodeEntry each : 
parameter.getTablesFirstDataNodes().getEntries()) {
             String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            result.add(generator.generateLogicDDLSQL(new 
PostgreSQLDatabaseType(), parameter.getJobConfig().getDatabaseName(), 
schemaName, each.getLogicTableName()));
+            result.add(generator.generateLogicDDLSQL(new 
PostgreSQLDatabaseType(), parameter.getDatabaseName(), schemaName, 
each.getLogicTableName()));
         }
         return result;
     }

Reply via email to