This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3139cb9129a Refactor
PipelineJobConfiguration.getSourceDatabaseType()'s return type (#26857)
3139cb9129a is described below
commit 3139cb9129acdae502f1b884172eceec9588976b
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jul 9 17:12:48 2023 +0800
Refactor PipelineJobConfiguration.getSourceDatabaseType()'s return type
(#26857)
---
.../config/job/PipelineJobConfiguration.java | 4 +++-
.../AbstractInventoryIncrementalJobAPIImpl.java | 2 +-
.../core/preparer/InventoryTaskSplitter.java | 5 +----
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 2 +-
.../cdc/config/job/CDCJobConfiguration.java | 3 ++-
.../swapper/YamlCDCJobConfigurationSwapper.java | 7 +++++--
.../job/YamlCDCJobConfigurationSwapperTest.java | 1 +
.../config/ConsistencyCheckJobConfiguration.java | 3 ++-
.../migration/api/impl/MigrationJobAPI.java | 4 ++--
.../config/MigrationJobConfiguration.java | 5 +++--
.../migration/context/MigrationJobItemContext.java | 2 +-
.../migration/prepare/MigrationJobPreparer.java | 22 +++++++++-------------
.../job/YamlMigrationJobConfigurationSwapper.java | 8 +++++---
13 files changed, 36 insertions(+), 32 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
index 3240f0ed8de..cc9df2d6348 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.data.pipeline.common.config.job;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+
/**
* Pipeline job configuration.
*/
@@ -41,5 +43,5 @@ public interface PipelineJobConfiguration {
*
* @return source database type
*/
- String getSourceDatabaseType();
+ DatabaseType getSourceDatabaseType();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 9267949059d..ff43cf6fa40 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -146,7 +146,7 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
InventoryIncrementalJobItemContext context =
(InventoryIncrementalJobItemContext) jobItemContext;
InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
jobItemProgress.setStatus(context.getStatus());
-
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
+
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType().getType());
jobItemProgress.setDataSourceName(context.getDataSourceName());
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index b659ba175ca..5a0dae236ef 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -48,9 +48,7 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -205,8 +203,7 @@ public final class InventoryTaskSplitter {
private Range<Long> getUniqueKeyValuesRange(final
InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
final InventoryDumperConfiguration dumperConfig) {
String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
- DatabaseType sourceDatabaseType =
TypedSPILoader.getService(DatabaseType.class,
jobItemContext.getJobConfig().getSourceDatabaseType());
- String sql =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, sourceDatabaseType)
+ String sql =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
jobItemContext.getJobConfig().getSourceDatabaseType())
.buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName())),
dumperConfig.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index a85e6c1696d..f351f866e85 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -194,7 +194,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
final PipelineDataSourceManager dataSourceManager,
final DumperConfiguration dumperConfig) throws SQLException {
InventoryIncrementalJobItemProgress result = new
InventoryIncrementalJobItemProgress();
- result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
+
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType().getType());
result.setDataSourceName(dumperConfig.getDataSourceName());
IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null,
dumperConfig, dataSourceManager));
result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index 027af04acf8..28d6afca706 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import java.util.List;
import java.util.Properties;
@@ -42,7 +43,7 @@ public final class CDCJobConfiguration implements
PipelineJobConfiguration {
private final boolean full;
- private final String sourceDatabaseType;
+ private final DatabaseType sourceDatabaseType;
private final ShardingSpherePipelineDataSourceConfiguration
dataSourceConfig;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
index f4cb413f470..5002c389bc8 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java
@@ -25,6 +25,8 @@ import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
@@ -46,7 +48,7 @@ public final class YamlCDCJobConfigurationSwapper implements
YamlConfigurationSw
result.setDatabaseName(data.getDatabaseName());
result.setSchemaTableNames(data.getSchemaTableNames());
result.setFull(data.isFull());
- result.setSourceDatabaseType(data.getSourceDatabaseType());
+ result.setSourceDatabaseType(data.getSourceDatabaseType().getType());
result.setDataSourceConfiguration(dataSourceConfigSwapper.swapToYamlConfiguration(data.getDataSourceConfig()));
result.setTablesFirstDataNodes(null == data.getTablesFirstDataNodes()
? null : data.getTablesFirstDataNodes().marshal());
List<String> jobShardingDataNodes = null ==
data.getJobShardingDataNodes() ? null :
data.getJobShardingDataNodes().stream().map(JobDataNodeLine::marshal).collect(Collectors.toList());
@@ -73,7 +75,8 @@ public final class YamlCDCJobConfigurationSwapper implements
YamlConfigurationSw
YamlSinkConfiguration yamlSinkConfig = yamlConfig.getSinkConfig();
SinkConfiguration sinkConfig = new
SinkConfiguration(CDCSinkType.valueOf(yamlSinkConfig.getSinkType()),
yamlSinkConfig.getProps());
JobDataNodeLine tablesFirstDataNodes = null ==
yamlConfig.getTablesFirstDataNodes() ? null :
JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes());
- return new CDCJobConfiguration(yamlConfig.getJobId(),
yamlConfig.getDatabaseName(), yamlConfig.getSchemaTableNames(),
yamlConfig.isFull(), yamlConfig.getSourceDatabaseType(),
+ return new CDCJobConfiguration(yamlConfig.getJobId(),
yamlConfig.getDatabaseName(), yamlConfig.getSchemaTableNames(),
yamlConfig.isFull(),
+ TypedSPILoader.getService(DatabaseType.class,
yamlConfig.getSourceDatabaseType()),
(ShardingSpherePipelineDataSourceConfiguration)
dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()),
tablesFirstDataNodes,
jobShardingDataNodes, yamlConfig.isDecodeWithTX(), sinkConfig,
yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
index a4888939630..dbc58f60781 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapperTest.java
@@ -39,6 +39,7 @@ class YamlCDCJobConfigurationSwapperTest {
yamlJobConfig.setDatabaseName("test_db");
yamlJobConfig.setSchemaTableNames(Arrays.asList("test.t_order",
"t_order_item"));
yamlJobConfig.setFull(true);
+ yamlJobConfig.setSourceDatabaseType("MySQL");
YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
sinkConfig.setSinkType(CDCSinkType.SOCKET.name());
yamlJobConfig.setSinkConfig(sinkConfig);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
index 34cadbf952d..f6a5af93734 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import java.util.Properties;
@@ -41,7 +42,7 @@ public final class ConsistencyCheckJobConfiguration
implements PipelineJobConfig
private final Properties algorithmProps;
@Override
- public String getSourceDatabaseType() {
+ public DatabaseType getSourceDatabaseType() {
throw new UnsupportedOperationException("");
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 0402c5e6aa2..23e411f4fba 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -294,7 +294,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
Collection<CreateTableEntry> createTableEntries = new LinkedList<>();
for (JobDataNodeEntry each :
jobConfig.getTablesFirstDataNodes().getEntries()) {
String sourceSchemaName =
tableNameSchemaNameMapping.getSchemaName(each.getLogicTableName());
- String targetSchemaName =
TypedSPILoader.getService(DatabaseType.class,
jobConfig.getTargetDatabaseType()).isSchemaAvailable() ? sourceSchemaName :
null;
+ String targetSchemaName =
jobConfig.getTargetDatabaseType().isSchemaAvailable() ? sourceSchemaName : null;
DataNode dataNode = each.getDataNodes().get(0);
PipelineDataSourceConfiguration sourceDataSourceConfig =
jobConfig.getSources().get(dataNode.getDataSourceName());
CreateTableEntry createTableEntry = new CreateTableEntry(
@@ -396,7 +396,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
- PipelineSQLBuilder pipelineSQLBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
TypedSPILoader.getService(DatabaseType.class,
jobConfig.getTargetDatabaseType()));
+ PipelineSQLBuilder pipelineSQLBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
jobConfig.getTargetDatabaseType());
TableNameSchemaNameMapping mapping = new
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
try (
PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
index 79bd8d470bf..2e95afce23c 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java
@@ -23,6 +23,7 @@ import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
import java.util.List;
import java.util.Map;
@@ -39,9 +40,9 @@ public final class MigrationJobConfiguration implements
PipelineJobConfiguration
private final String targetDatabaseName;
- private final String sourceDatabaseType;
+ private final DatabaseType sourceDatabaseType;
- private final String targetDatabaseType;
+ private final DatabaseType targetDatabaseType;
private final Map<String, PipelineDataSourceConfiguration> sources;
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index 4fa0e602447..fabd841a7e0 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -135,7 +135,7 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
* @return true if source and target database the same, otherwise false
*/
public boolean isSourceTargetDatabaseTheSame() {
- return
jobConfig.getSourceDatabaseType().equalsIgnoreCase(jobConfig.getTargetDatabaseType());
+ return jobConfig.getSourceDatabaseType() ==
jobConfig.getTargetDatabaseType();
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 8fc99777aef..c7bf5a2187e 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -56,7 +56,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.lock.GlobalLockNames;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
@@ -65,7 +64,6 @@ import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
import java.sql.SQLException;
@@ -92,8 +90,7 @@ public final class MigrationJobPreparer {
public void prepare(final MigrationJobItemContext jobItemContext) throws
SQLException {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration"));
- DatabaseType sourceDatabaseType =
TypedSPILoader.getService(DatabaseType.class,
jobItemContext.getJobConfig().getSourceDatabaseType());
- PipelineJobPreparerUtils.checkSourceDataSource(sourceDatabaseType,
Collections.singleton(jobItemContext.getSourceDataSource()));
+
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
@@ -105,7 +102,7 @@ public final class MigrationJobPreparer {
}
prepareIncremental(jobItemContext);
initInventoryTasks(jobItemContext);
- if
(PipelineJobPreparerUtils.isIncrementalSupported(sourceDatabaseType)) {
+ if
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
{
initIncrementalTasks(jobItemContext);
if (jobItemContext.isStopping()) {
PipelineJobCenter.stop(jobItemContext.getJobId());
@@ -146,27 +143,26 @@ public final class MigrationJobPreparer {
}
private void prepareAndCheckTarget(final MigrationJobItemContext
jobItemContext) throws SQLException {
- DatabaseType targetDatabaseType =
TypedSPILoader.getService(DatabaseType.class,
jobItemContext.getJobConfig().getTargetDatabaseType());
if (jobItemContext.isSourceTargetDatabaseTheSame()) {
- prepareTarget(jobItemContext, targetDatabaseType);
+ prepareTarget(jobItemContext);
}
InventoryIncrementalJobItemProgress initProgress =
jobItemContext.getInitProgress();
if (null == initProgress) {
PipelineDataSourceWrapper targetDataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
- PipelineJobPreparerUtils.checkTargetDataSource(targetDatabaseType,
jobItemContext.getTaskConfig().getImporterConfig(),
Collections.singleton(targetDataSource));
+ PipelineJobPreparerUtils.checkTargetDataSource(
+ jobItemContext.getJobConfig().getTargetDatabaseType(),
jobItemContext.getTaskConfig().getImporterConfig(),
Collections.singleton(targetDataSource));
}
}
- private void prepareTarget(final MigrationJobItemContext jobItemContext,
final DatabaseType targetDatabaseType) throws SQLException {
+ private void prepareTarget(final MigrationJobItemContext jobItemContext)
throws SQLException {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
CreateTableConfiguration createTableConfig =
jobItemContext.getTaskConfig().getCreateTableConfig();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
- PrepareTargetSchemasParameter prepareTargetSchemasParam = new
PrepareTargetSchemasParameter(
- DatabaseTypedSPILoader.getService(DatabaseType.class,
targetDatabaseType), createTableConfig, dataSourceManager);
- PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType,
prepareTargetSchemasParam);
+ PrepareTargetSchemasParameter prepareTargetSchemasParam = new
PrepareTargetSchemasParameter(jobItemContext.getJobConfig().getTargetDatabaseType(),
createTableConfig, dataSourceManager);
+
PipelineJobPreparerUtils.prepareTargetSchema(jobItemContext.getJobConfig().getTargetDatabaseType(),
prepareTargetSchemasParam);
ShardingSphereMetaData metaData =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
SQLParserEngine sqlParserEngine =
PipelineJobPreparerUtils.getSQLParserEngine(metaData,
jobConfig.getTargetDatabaseName());
- PipelineJobPreparerUtils.prepareTargetTables(targetDatabaseType, new
PrepareTargetTablesParameter(createTableConfig, dataSourceManager,
sqlParserEngine));
+
PipelineJobPreparerUtils.prepareTargetTables(jobItemContext.getJobConfig().getTargetDatabaseType(),
new PrepareTargetTablesParameter(createTableConfig, dataSourceManager,
sqlParserEngine));
}
private void prepareIncremental(final MigrationJobItemContext
jobItemContext) {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
index 0320b1c8c68..52db3c7858f 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.yaml.job;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
@@ -39,8 +41,8 @@ public final class YamlMigrationJobConfigurationSwapper
implements YamlConfigura
YamlMigrationJobConfiguration result = new
YamlMigrationJobConfiguration();
result.setJobId(data.getJobId());
result.setTargetDatabaseName(data.getTargetDatabaseName());
- result.setSourceDatabaseType(data.getSourceDatabaseType());
- result.setTargetDatabaseType(data.getTargetDatabaseType());
+ result.setSourceDatabaseType(data.getSourceDatabaseType().getType());
+ result.setTargetDatabaseType(data.getTargetDatabaseType().getType());
result.setSources(data.getSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
entry ->
dataSourceConfigSwapper.swapToYamlConfiguration(entry.getValue()), (key, value)
-> value, LinkedHashMap::new)));
result.setTarget(dataSourceConfigSwapper.swapToYamlConfiguration(data.getTarget()));
@@ -56,7 +58,7 @@ public final class YamlMigrationJobConfigurationSwapper
implements YamlConfigura
@Override
public MigrationJobConfiguration swapToObject(final
YamlMigrationJobConfiguration yamlConfig) {
return new MigrationJobConfiguration(yamlConfig.getJobId(),
yamlConfig.getDatabaseName(),
- yamlConfig.getSourceDatabaseType(),
yamlConfig.getTargetDatabaseType(),
+ TypedSPILoader.getService(DatabaseType.class,
yamlConfig.getSourceDatabaseType()),
TypedSPILoader.getService(DatabaseType.class,
yamlConfig.getTargetDatabaseType()),
yamlConfig.getSources().entrySet().stream().collect(Collectors.toMap(Entry::getKey,
entry ->
dataSourceConfigSwapper.swapToObject(entry.getValue()), (key, value) -> value,
LinkedHashMap::new)),
dataSourceConfigSwapper.swapToObject(yamlConfig.getTarget()),