This is an automated email from the ASF dual-hosted git repository. panjuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 41db48e6bb9 Refactor MigrationJobAPI (#29213) 41db48e6bb9 is described below commit 41db48e6bb9323858a975a3003db3e3d7ef7e4f9 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Mon Nov 27 03:29:19 2023 +0800 Refactor MigrationJobAPI (#29213) --- .../ShowMigrationSourceStorageUnitsExecutor.java | 5 +- .../handler/update/MigrateTableUpdater.java | 5 +- .../RegisterMigrationSourceStorageUnitUpdater.java | 5 +- ...nregisterMigrationSourceStorageUnitUpdater.java | 5 +- .../api/impl/ConsistencyCheckJobAPI.java | 4 +- ...tionJobOption.java => MigrationJobManager.java} | 165 +++------------- .../migration/api/impl/MigrationJobOption.java | 209 --------------------- .../migration/api/impl/MigrationJobAPITest.java | 17 +- 8 files changed, 48 insertions(+), 367 deletions(-) diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java index 2b5a4b5db72..bab525e6822 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.migration.distsql.handler.query; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; @@ -35,11 +36,11 @@ import java.util.List; */ public final class ShowMigrationSourceStorageUnitsExecutor implements QueryableRALExecutor<ShowMigrationSourceStorageUnitsStatement> { - private final MigrationJobOption jobOption = new MigrationJobOption(); + private final MigrationJobManager jobManager = new MigrationJobManager(new MigrationJobOption()); @Override public Collection<LocalDataQueryResultRow> getRows(final ShowMigrationSourceStorageUnitsStatement sqlStatement) { - Iterator<Collection<Object>> data = jobOption.listMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY)).iterator(); + Iterator<Collection<Object>> data = jobManager.listMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY)).iterator(); Collection<LocalDataQueryResultRow> result = new LinkedList<>(); while (data.hasNext()) { result.add(new LocalDataQueryResultRow((List<Object>) data.next())); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java index aeb49d9005f..23f220d73fb 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java @@ -20,6 +20,7 @@ package org.apache.shardingsphere.migration.distsql.handler.update; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException; +import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; @@ -32,13 +33,13 @@ import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStateme @Slf4j public final class MigrateTableUpdater implements RALUpdater<MigrateTableStatement> { - private final MigrationJobOption jobOption = new MigrationJobOption(); + private final MigrationJobManager jobManager = new MigrationJobManager(new MigrationJobOption()); @Override public void executeUpdate(final String databaseName, final MigrateTableStatement sqlStatement) { String targetDatabaseName = null == sqlStatement.getTargetDatabaseName() ? databaseName : sqlStatement.getTargetDatabaseName(); ShardingSpherePreconditions.checkNotNull(targetDatabaseName, MissingRequiredTargetDatabaseException::new); - jobOption.createJobAndStart(new PipelineContextKey(InstanceType.PROXY), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName)); + jobManager.start(new PipelineContextKey(InstanceType.PROXY), new MigrateTableStatement(sqlStatement.getSourceTargetEntries(), targetDatabaseName)); } @Override diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java index b89d63262ca..d26fee0d0a1 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.migration.distsql.handler.update; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.distsql.handler.validate.DataSourcePoolPropertiesValidateHandler; @@ -42,7 +43,7 @@ import java.util.Map; */ public final class RegisterMigrationSourceStorageUnitUpdater implements RALUpdater<RegisterMigrationSourceStorageUnitStatement> { - private final MigrationJobOption jobOption = new MigrationJobOption(); + private final MigrationJobManager jobManager = new MigrationJobManager(new MigrationJobOption()); private final DataSourcePoolPropertiesValidateHandler validateHandler = new DataSourcePoolPropertiesValidateHandler(); @@ -55,7 +56,7 @@ public final class RegisterMigrationSourceStorageUnitUpdater implements RALUpdat DatabaseType databaseType = DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl()); Map<String, DataSourcePoolProperties> propsMap = DataSourceSegmentsConverter.convert(databaseType, dataSources); validateHandler.validate(propsMap); - jobOption.addMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), propsMap); + jobManager.addMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), propsMap); } @Override diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java index 37d7e8cc00f..3f4cdfe20d2 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.migration.distsql.handler.update; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; @@ -28,11 +29,11 @@ import org.apache.shardingsphere.migration.distsql.statement.UnregisterMigration */ public final class UnregisterMigrationSourceStorageUnitUpdater implements RALUpdater<UnregisterMigrationSourceStorageUnitStatement> { - private final MigrationJobOption jobOption = new MigrationJobOption(); + private final MigrationJobManager jobManager = new MigrationJobManager(new MigrationJobOption()); @Override public void executeUpdate(final String databaseName, final UnregisterMigrationSourceStorageUnitStatement sqlStatement) { - jobOption.dropMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames()); + jobManager.dropMigrationSourceResources(new PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames()); } @Override diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 472648bfdae..7d0c0135b74 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -147,11 +147,11 @@ public final class ConsistencyCheckJobAPI { * @param parentJobId parent job id */ public void drop(final String parentJobId) { - jobManager.stop(parentJobId); + String latestCheckJobId = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId); + jobManager.stop(latestCheckJobId); PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(contextKey); Collection<String> checkJobIds = governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId); - String latestCheckJobId = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId); Optional<Integer> previousSequence = ConsistencyCheckSequence.getPreviousSequence( checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()), ConsistencyCheckJobId.parseSequence(latestCheckJobId)); if (previousSequence.isPresent()) { diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java similarity index 61% copy from kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java copy to kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java index c3b8886940d..f356ed53733 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java @@ -21,47 +21,21 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; -import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; -import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; -import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; -import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException; import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException; import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; -import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator; -import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.infra.config.rule.RuleConfiguration; @@ -94,28 +68,36 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; /** - * Migration job option. + * Migration job manager. */ @Slf4j -public final class MigrationJobOption implements TransmissionJobOption { +public final class MigrationJobManager { - private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService(); + private final MigrationJobOption jobOption; + + private final PipelineJobManager jobManager; + + private final PipelineDataSourcePersistService dataSourcePersistService; + + public MigrationJobManager(final MigrationJobOption jobOption) { + this.jobOption = jobOption; + jobManager = new PipelineJobManager(jobOption); + dataSourcePersistService = new PipelineDataSourcePersistService(); + } /** - * Create job migration config and start. + * Start migration job. * * @param contextKey context key * @param param create migration job parameter * @return job id */ - public String createJobAndStart(final PipelineContextKey contextKey, final MigrateTableStatement param) { + public String start(final PipelineContextKey contextKey, final MigrateTableStatement param) { MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param)); - new PipelineJobManager(this).start(jobConfig); + jobManager.start(jobConfig); return jobConfig.getJobId(); } @@ -163,7 +145,7 @@ public final class MigrationJobOption implements TransmissionJobOption { result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes)); result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal()); result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList())); - extendYamlJobConfiguration(contextKey, result); + jobOption.extendYamlJobConfiguration(contextKey, result); return result; } @@ -185,9 +167,7 @@ public final class MigrationJobOption implements TransmissionJobOption { } private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) { - if (rules.isEmpty()) { - throw new NoAnyRuleExistsException(databaseName); - } + ShardingSpherePreconditions.checkState(!rules.isEmpty(), () -> new NoAnyRuleExistsException(databaseName)); YamlRootConfiguration result = new YamlRootConfiguration(); result.setDatabaseName(databaseName); result.setDataSources(yamlDataSources); @@ -201,95 +181,6 @@ public final class MigrationJobOption implements TransmissionJobOption { return result; } - @Override - public PipelineJobInfo getJobInfo(final String jobId) { - PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); - List<String> sourceTables = new LinkedList<>(); - new PipelineJobConfigurationManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes() - .forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); - return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables)); - } - - @Override - public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) { - YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) yamlJobConfig; - if (null == yamlJobConfig.getJobId()) { - config.setJobId(new MigrationJobId(contextKey, config.getJobShardingDataNodes()).marshal()); - } - } - - @SuppressWarnings("unchecked") - @Override - public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() { - return new YamlMigrationJobConfigurationSwapper(); - } - - @Override - public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { - MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; - IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( - jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); - Collection<CreateTableConfiguration> createTableConfigs = buildCreateTableConfigurations(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); - Set<CaseInsensitiveIdentifier> targetTableNames = jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()); - Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( - ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); - ImporterConfiguration importerConfig = buildImporterConfiguration( - jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); - MigrationTaskConfiguration result = new MigrationTaskConfiguration( - incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig); - log.info("buildTaskConfiguration, result={}", result); - return result; - } - - private Collection<CreateTableConfiguration> buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - Collection<CreateTableConfiguration> result = new LinkedList<>(); - for (JobDataNodeEntry each : jobConfig.getTablesFirstDataNodes().getEntries()) { - String sourceSchemaName = tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName()); - DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData(); - String targetSchemaName = dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null; - DataNode dataNode = each.getDataNodes().get(0); - PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSources().get(dataNode.getDataSourceName()); - CreateTableConfiguration createTableConfig = new CreateTableConfiguration( - sourceDataSourceConfig, new CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()), - jobConfig.getTarget(), new CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName())); - result.add(createTableConfig); - } - log.info("buildCreateTableConfigurations, result={}", result); - return result; - } - - private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, - final Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - MigrationProcessContext processContext = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig); - JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); - int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); - int retryTimes = jobConfig.getRetryTimes(); - int concurrency = jobConfig.getConcurrency(); - return new ImporterConfiguration(jobConfig.getTarget(), shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, retryTimes, concurrency); - } - - @Override - public MigrationProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) { - PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); - return new MigrationProcessContext(jobConfig.getJobId(), processConfig); - } - - @Override - public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext, - final ConsistencyCheckJobItemProgressContext progressContext) { - return new MigrationDataConsistencyChecker((MigrationJobConfiguration) jobConfig, processContext, progressContext); - } - - @Override - public Optional<String> getToBeStartDisabledNextJobType() { - return Optional.of("CONSISTENCY_CHECK"); - } - - @Override - public Optional<String> getToBeStoppedPreviousJobType() { - return Optional.of("CONSISTENCY_CHECK"); - } - /** * Add migration source resources. * @@ -297,7 +188,7 @@ public final class MigrationJobOption implements TransmissionJobOption { * @param propsMap data source pool properties map */ public void addMigrationSourceResources(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) { - Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType()); + Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, jobOption.getType()); Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F); for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) { if (existDataSources.containsKey(entry.getKey())) { @@ -307,7 +198,7 @@ public final class MigrationJobOption implements TransmissionJobOption { ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () -> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames)); Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(existDataSources); result.putAll(propsMap); - dataSourcePersistService.persist(contextKey, getType(), result); + dataSourcePersistService.persist(contextKey, jobOption.getType(), result); } /** @@ -317,13 +208,13 @@ public final class MigrationJobOption implements TransmissionJobOption { * @param resourceNames resource names */ public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) { - Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getType()); + Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, jobOption.getType()); List<String> noExistResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList()); ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () -> new UnregisterMigrationSourceStorageUnitException(noExistResources)); for (String each : resourceNames) { metaDataDataSource.remove(each); } - dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource); + dataSourcePersistService.persist(contextKey, jobOption.getType(), metaDataDataSource); } /** @@ -333,7 +224,7 @@ public final class MigrationJobOption implements TransmissionJobOption { * @return migration source resources */ public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) { - Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getType()); + Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, jobOption.getType()); Collection<Collection<Object>> result = new ArrayList<>(propsMap.size()); for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) { String dataSourceName = entry.getKey(); @@ -367,14 +258,4 @@ public final class MigrationJobOption implements TransmissionJobOption { } return ""; } - - @Override - public Class<MigrationJob> getJobClass() { - return MigrationJob.class; - } - - @Override - public String getType() { - return "MIGRATION"; - } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java index c3b8886940d..c62eb313f46 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java @@ -20,41 +20,29 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; -import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; -import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; -import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; -import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException; -import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException; -import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException; -import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker; @@ -64,36 +52,14 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest. import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper; -import org.apache.shardingsphere.infra.config.rule.RuleConfiguration; -import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties; -import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; -import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; -import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.datanode.DataNode; -import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; -import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; -import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; -import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit; -import org.apache.shardingsphere.infra.util.json.JsonUtils; -import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration; -import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; -import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine; -import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement; -import org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry; -import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -104,103 +70,6 @@ import java.util.stream.Collectors; @Slf4j public final class MigrationJobOption implements TransmissionJobOption { - private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService(); - - /** - * Create job migration config and start. - * - * @param contextKey context key - * @param param create migration job parameter - * @return job id - */ - public String createJobAndStart(final PipelineContextKey contextKey, final MigrateTableStatement param) { - MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param)); - new PipelineJobManager(this).start(jobConfig); - return jobConfig.getJobId(); - } - - private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) { - YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration(); - result.setTargetDatabaseName(param.getTargetDatabaseName()); - Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION"); - Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>(); - Map<String, YamlPipelineDataSourceConfiguration> configSources = new LinkedHashMap<>(); - List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName) - .thenComparing(each -> DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList()); - YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); - for (SourceTargetEntry each : sourceTargetEntries) { - sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource()); - ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(), - () -> new PipelineInvalidParameterException("more than one source table for " + each.getTargetTableName())); - String dataSourceName = each.getSource().getDataSourceName(); - if (configSources.containsKey(dataSourceName)) { - continue; - } - ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName), - () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it.")); - Map<String, Object> sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName)); - StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps); - configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter())); - DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData(); - if (null == each.getSource().getSchemaName() && dialectDatabaseMetaData.isSchemaAvailable()) { - each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig)); - } - DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType(); - if (null == result.getSourceDatabaseType()) { - result.setSourceDatabaseType(sourceDatabaseType.getType()); - } else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) { - throw new PipelineInvalidParameterException("Source storage units have different database types"); - } - } - result.setSources(configSources); - ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName()); - PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase); - result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter())); - result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType()); - List<JobDataNodeEntry> tablesFirstDataNodes = sourceDataNodes.entrySet().stream() - .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList()); - result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList())); - result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes)); - result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal()); - result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList())); - extendYamlJobConfiguration(contextKey, result); - return result; - } - - private YamlPipelineDataSourceConfiguration buildYamlPipelineDataSourceConfiguration(final String type, final String param) { - YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration(); - result.setType(type); - result.setParameter(param); - return result; - } - - private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) { - Map<String, Map<String, Object>> targetPoolProps = new HashMap<>(); - YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); - for (Entry<String, StorageUnit> entry : targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) { - targetPoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties())); - } - YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, targetDatabase.getRuleMetaData().getConfigurations()); - return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig); - } - - private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map<String, Map<String, Object>> yamlDataSources, final Collection<RuleConfiguration> rules) { - if (rules.isEmpty()) { - throw new NoAnyRuleExistsException(databaseName); - } - YamlRootConfiguration result = new YamlRootConfiguration(); - result.setDatabaseName(databaseName); - result.setDataSources(yamlDataSources); - result.setRules(new YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules)); - return result; - } - - private Map<String, String> buildTargetTableSchemaMap(final Map<String, List<DataNode>> sourceDataNodes) { - Map<String, String> result = new LinkedHashMap<>(); - sourceDataNodes.forEach((tableName, dataNodes) -> result.put(tableName, dataNodes.get(0).getSchemaName())); - return result; - } - @Override public PipelineJobInfo getJobInfo(final String jobId) { PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); @@ -290,84 +159,6 @@ public final class MigrationJobOption implements TransmissionJobOption { return Optional.of("CONSISTENCY_CHECK"); } - /** - * Add migration source resources. - * - * @param contextKey context key - * @param propsMap data source pool properties map - */ - public void addMigrationSourceResources(final PipelineContextKey contextKey, final Map<String, DataSourcePoolProperties> propsMap) { - Map<String, DataSourcePoolProperties> existDataSources = dataSourcePersistService.load(contextKey, getType()); - Collection<String> duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F); - for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) { - if (existDataSources.containsKey(entry.getKey())) { - duplicateDataSourceNames.add(entry.getKey()); - } - } - ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () -> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames)); - Map<String, DataSourcePoolProperties> result = new LinkedHashMap<>(existDataSources); - result.putAll(propsMap); - dataSourcePersistService.persist(contextKey, getType(), result); - } - - /** - * Drop migration source resources. - * - * @param contextKey context key - * @param resourceNames resource names - */ - public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection<String> resourceNames) { - Map<String, DataSourcePoolProperties> metaDataDataSource = dataSourcePersistService.load(contextKey, getType()); - List<String> noExistResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList()); - ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () -> new UnregisterMigrationSourceStorageUnitException(noExistResources)); - for (String each : resourceNames) { - metaDataDataSource.remove(each); - } - dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource); - } - - /** - * Query migration source resources list. - * - * @param contextKey context key - * @return migration source resources - */ - public Collection<Collection<Object>> listMigrationSourceResources(final PipelineContextKey contextKey) { - Map<String, DataSourcePoolProperties> propsMap = dataSourcePersistService.load(contextKey, getType()); - Collection<Collection<Object>> result = new ArrayList<>(propsMap.size()); - for (Entry<String, DataSourcePoolProperties> entry : propsMap.entrySet()) { - String dataSourceName = entry.getKey(); - DataSourcePoolProperties value = entry.getValue(); - Collection<Object> props = new LinkedList<>(); - props.add(dataSourceName); - String url = String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url")); - DatabaseType databaseType = DatabaseTypeFactory.get(url); - props.add(databaseType.getType()); - ConnectionProperties connectionProps = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, databaseType).parse(url, "", null); - props.add(connectionProps.getHostname()); - props.add(connectionProps.getPort()); - props.add(connectionProps.getCatalog()); - Map<String, Object> standardProps = value.getPoolPropertySynonyms().getStandardProperties(); - props.add(getStandardProperty(standardProps, "connectionTimeoutMilliseconds")); - props.add(getStandardProperty(standardProps, "idleTimeoutMilliseconds")); - props.add(getStandardProperty(standardProps, "maxLifetimeMilliseconds")); - props.add(getStandardProperty(standardProps, "maxPoolSize")); - props.add(getStandardProperty(standardProps, "minPoolSize")); - props.add(getStandardProperty(standardProps, "readOnly")); - Map<String, Object> otherProps = value.getCustomProperties().getProperties(); - props.add(otherProps.isEmpty() ? "" : JsonUtils.toJsonString(otherProps)); - result.add(props); - } - return result; - } - - private String getStandardProperty(final Map<String, Object> standardProps, final String key) { - if (standardProps.containsKey(key) && null != standardProps.get(key)) { - return standardProps.get(key).toString(); - } - return ""; - } - @Override public Class<MigrationJob> getJobClass() { return MigrationJob.class; diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 4cb3930a110..32e144ab645 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -40,6 +40,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManag import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; +import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; @@ -95,6 +96,8 @@ class MigrationJobAPITest { private static MigrationJobAPI jobAPI; + private static MigrationJobManager migrationJobManager; + private static PipelineJobConfigurationManager jobConfigManager; private static PipelineJobManager jobManager; @@ -110,6 +113,7 @@ class MigrationJobAPITest { PipelineContextUtils.mockModeConfigAndContextManager(); jobOption = new MigrationJobOption(); jobAPI = new MigrationJobAPI(); + migrationJobManager = new MigrationJobManager(jobOption); jobConfigManager = new PipelineJobConfigurationManager(jobOption); jobManager = new PipelineJobManager(jobOption); transmissionJobManager = new TransmissionJobManager(jobOption); @@ -120,12 +124,13 @@ class MigrationJobAPITest { props.put("jdbcUrl", jdbcUrl); props.put("username", "root"); props.put("password", "root"); - jobOption.addMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonMap("ds_0", new DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props))); + migrationJobManager.addMigrationSourceResources( + PipelineContextUtils.getContextKey(), Collections.singletonMap("ds_0", new DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props))); } @AfterAll static void afterClass() { - jobOption.dropMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonList("ds_0")); + migrationJobManager.dropMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonList("ds_0")); } @Test @@ -253,20 +258,20 @@ class MigrationJobAPITest { void assertCreateJobConfigFailedOnMoreThanOneSourceTable() { List<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0", "t_order_1") .map(each -> new SourceTargetEntry("logic_db", new DataNode("ds_0", each), "t_order")).collect(Collectors.toList()); - assertThrows(PipelineInvalidParameterException.class, () -> jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(sourceTargetEntries, "logic_db"))); + assertThrows(PipelineInvalidParameterException.class, () -> migrationJobManager.start(PipelineContextUtils.getContextKey(), new MigrateTableStatement(sourceTargetEntries, "logic_db"))); } @Test void assertCreateJobConfigFailedOnDataSourceNotExist() { List<SourceTargetEntry> sourceTargetEntries = Collections.singletonList(new SourceTargetEntry("logic_db", new DataNode("ds_not_exists", "t_order"), "t_order")); - assertThrows(PipelineInvalidParameterException.class, () -> jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(sourceTargetEntries, "logic_db"))); + assertThrows(PipelineInvalidParameterException.class, () -> migrationJobManager.start(PipelineContextUtils.getContextKey(), new MigrateTableStatement(sourceTargetEntries, "logic_db"))); } @Test void assertCreateJobConfig() throws SQLException { initIntPrimaryEnvironment(); SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order"); - String jobId = jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db")); + String jobId = migrationJobManager.start(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db")); MigrationJobConfiguration actual = jobConfigManager.getJobConfiguration(jobId); assertThat(actual.getTargetDatabaseName(), is("logic_db")); List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes(); @@ -294,7 +299,7 @@ class MigrationJobAPITest { @Test void assertShowMigrationSourceResources() { - Collection<Collection<Object>> actual = jobOption.listMigrationSourceResources(PipelineContextUtils.getContextKey()); + Collection<Collection<Object>> actual = migrationJobManager.listMigrationSourceResources(PipelineContextUtils.getContextKey()); assertThat(actual.size(), is(1)); Collection<Object> objects = actual.iterator().next(); assertThat(objects.toArray()[0], is("ds_0"));