This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c4db6bf4070 Compatible with different source and target database type
migration (#20421)
c4db6bf4070 is described below
commit c4db6bf40703b353ff575312fd3fa345082cd078
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Aug 23 09:56:09 2022 +0800
Compatible with different source and target database type migration (#20421)
---
.../pipeline/core/prepare/PipelineJobPreparerUtils.java | 16 ++++++++++++++++
.../core/task/InventoryIncrementalTasksRunner.java | 4 ++++
.../data/pipeline/core/task/InventoryTask.java | 4 ++--
.../scenario/migration/MigrationJobItemContext.java | 9 +++++++++
.../scenario/migration/MigrationJobPreparer.java | 13 +++++++++----
5 files changed, 40 insertions(+), 6 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 327e16cd324..7670ae28bbc 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -42,8 +42,11 @@ import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSour
import javax.sql.DataSource;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Optional;
+import java.util.Set;
/**
* Pipeline job preparer utils.
@@ -51,6 +54,19 @@ import java.util.Optional;
@Slf4j
public final class PipelineJobPreparerUtils {
+ private static final Set<String> INCREMENTAL_SUPPORTED_DATABASES = new
HashSet<>(Arrays.asList("MySQL", "PostgreSQL", "openGauss"));
+
+ /**
+ * Is incremental supported.
+ *
+ * @param databaseType database type
+ * @return true if supported, otherwise false
+ */
+ public static boolean isIncrementalSupported(final String databaseType) {
+ // TODO check by IncrementalDumperCreator SPI
+ return INCREMENTAL_SUPPORTED_DATABASES.contains(databaseType);
+ }
+
/**
* Prepare target schema.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 3ac38ede62f..b9ea57c776c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -130,6 +130,10 @@ public final class InventoryIncrementalTasksRunner
implements PipelineTasksRunne
}
private synchronized void executeIncrementalTask() {
+ if (incrementalTasks.isEmpty()) {
+ log.info("incrementalTasks empty, ignore");
+ return;
+ }
if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemContext.getStatus()) {
log.info("job status already EXECUTE_INCREMENTAL_TASK, ignore");
return;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 762ae27858a..23dbf5ffc77 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -72,8 +72,8 @@ public final class InventoryTask extends
AbstractLifecycleExecutor implements Pi
this.importerExecuteEngine = importerExecuteEngine;
taskId = generateTaskId(inventoryDumperConfig);
channel = createChannel(pipelineChannelCreator);
- dumper =
InventoryDumperCreatorFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType()).createInventoryDumper(inventoryDumperConfig,
channel, sourceDataSource,
- sourceMetaDataLoader);
+ dumper =
InventoryDumperCreatorFactory.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getType())
+ .createInventoryDumper(inventoryDumperConfig, channel,
sourceDataSource, sourceMetaDataLoader);
importer =
ImporterCreatorFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfig,
dataSourceManager, channel, jobProgressListener);
position = inventoryDumperConfig.getPosition();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index bfd3e25ca6b..c5a4470921b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -113,4 +113,13 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
return sourceMetaDataLoaderLazyInitializer.get();
}
+
+ /**
+ * Is source and target database the same or not.
+ *
+ * @return true if source and target database the same, otherwise false
+ */
+ public boolean isSourceTargetDatabaseTheSame() {
+ return
jobConfig.getSourceDatabaseType().equalsIgnoreCase(jobConfig.getTargetDatabaseType());
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index 0c47a28a51a..361298ca142 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -80,9 +80,11 @@ public final class MigrationJobPreparer {
}
// TODO check metadata
try {
- initIncrementalTasks(jobItemContext);
- if (jobItemContext.isStopping()) {
- throw new PipelineIgnoredException("Job stopping, jobId=" +
jobItemContext.getJobId());
+ if
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
{
+ initIncrementalTasks(jobItemContext);
+ if (jobItemContext.isStopping()) {
+ throw new PipelineIgnoredException("Job stopping, jobId="
+ jobItemContext.getJobId());
+ }
}
initInventoryTasks(jobItemContext);
log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={},
incrementalTasks={}",
@@ -123,7 +125,10 @@ public final class MigrationJobPreparer {
}
private void prepareAndCheckTarget(final MigrationJobItemContext
jobItemContext) throws SQLException {
- prepareTarget(jobItemContext);
+ if (jobItemContext.isSourceTargetDatabaseTheSame()) {
+ log.info("prepare target ...");
+ prepareTarget(jobItemContext);
+ }
InventoryIncrementalJobItemProgress initProgress =
jobItemContext.getInitProgress();
if (null == initProgress || initProgress.getStatus() ==
JobStatus.PREPARING_FAILURE) {
PipelineDataSourceWrapper targetDataSource =
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());