This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c4db6bf4070 Compatible with different source and target database type 
migration (#20421)
c4db6bf4070 is described below

commit c4db6bf40703b353ff575312fd3fa345082cd078
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Aug 23 09:56:09 2022 +0800

    Compatible with different source and target database type migration (#20421)
---
 .../pipeline/core/prepare/PipelineJobPreparerUtils.java  | 16 ++++++++++++++++
 .../core/task/InventoryIncrementalTasksRunner.java       |  4 ++++
 .../data/pipeline/core/task/InventoryTask.java           |  4 ++--
 .../scenario/migration/MigrationJobItemContext.java      |  9 +++++++++
 .../scenario/migration/MigrationJobPreparer.java         | 13 +++++++++----
 5 files changed, 40 insertions(+), 6 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 327e16cd324..7670ae28bbc 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -42,8 +42,11 @@ import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSour
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Optional;
+import java.util.Set;
 
 /**
  * Pipeline job preparer utils.
@@ -51,6 +54,19 @@ import java.util.Optional;
 @Slf4j
 public final class PipelineJobPreparerUtils {
     
+    private static final Set<String> INCREMENTAL_SUPPORTED_DATABASES = new 
HashSet<>(Arrays.asList("MySQL", "PostgreSQL", "openGauss"));
+    
+    /**
+     * Is incremental supported.
+     *
+     * @param databaseType database type
+     * @return true if supported, otherwise false
+     */
+    public static boolean isIncrementalSupported(final String databaseType) {
+        // TODO check by IncrementalDumperCreator SPI
+        return INCREMENTAL_SUPPORTED_DATABASES.contains(databaseType);
+    }
+    
     /**
      * Prepare target schema.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 3ac38ede62f..b9ea57c776c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -130,6 +130,10 @@ public final class InventoryIncrementalTasksRunner 
implements PipelineTasksRunne
     }
     
     private synchronized void executeIncrementalTask() {
+        if (incrementalTasks.isEmpty()) {
+            log.info("incrementalTasks empty, ignore");
+            return;
+        }
         if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemContext.getStatus()) {
             log.info("job status already EXECUTE_INCREMENTAL_TASK, ignore");
             return;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 762ae27858a..23dbf5ffc77 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -72,8 +72,8 @@ public final class InventoryTask extends 
AbstractLifecycleExecutor implements Pi
         this.importerExecuteEngine = importerExecuteEngine;
         taskId = generateTaskId(inventoryDumperConfig);
         channel = createChannel(pipelineChannelCreator);
-        dumper = 
InventoryDumperCreatorFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType()).createInventoryDumper(inventoryDumperConfig,
 channel, sourceDataSource,
-                sourceMetaDataLoader);
+        dumper = 
InventoryDumperCreatorFactory.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getType())
+                .createInventoryDumper(inventoryDumperConfig, channel, 
sourceDataSource, sourceMetaDataLoader);
         importer = 
ImporterCreatorFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfig,
 dataSourceManager, channel, jobProgressListener);
         position = inventoryDumperConfig.getPosition();
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index bfd3e25ca6b..c5a4470921b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -113,4 +113,13 @@ public final class MigrationJobItemContext implements 
InventoryIncrementalJobIte
     public PipelineTableMetaDataLoader getSourceMetaDataLoader() {
         return sourceMetaDataLoaderLazyInitializer.get();
     }
+    
+    /**
+     * Is source and target database the same or not.
+     *
+     * @return true if source and target database the same, otherwise false
+     */
+    public boolean isSourceTargetDatabaseTheSame() {
+        return 
jobConfig.getSourceDatabaseType().equalsIgnoreCase(jobConfig.getTargetDatabaseType());
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index 0c47a28a51a..361298ca142 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -80,9 +80,11 @@ public final class MigrationJobPreparer {
         }
         // TODO check metadata
         try {
-            initIncrementalTasks(jobItemContext);
-            if (jobItemContext.isStopping()) {
-                throw new PipelineIgnoredException("Job stopping, jobId=" + 
jobItemContext.getJobId());
+            if 
(PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType()))
 {
+                initIncrementalTasks(jobItemContext);
+                if (jobItemContext.isStopping()) {
+                    throw new PipelineIgnoredException("Job stopping, jobId=" 
+ jobItemContext.getJobId());
+                }
             }
             initInventoryTasks(jobItemContext);
             log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, 
incrementalTasks={}",
@@ -123,7 +125,10 @@ public final class MigrationJobPreparer {
     }
     
     private void prepareAndCheckTarget(final MigrationJobItemContext 
jobItemContext) throws SQLException {
-        prepareTarget(jobItemContext);
+        if (jobItemContext.isSourceTargetDatabaseTheSame()) {
+            log.info("prepare target ...");
+            prepareTarget(jobItemContext);
+        }
         InventoryIncrementalJobItemProgress initProgress = 
jobItemContext.getInitProgress();
         if (null == initProgress || initProgress.getStatus() == 
JobStatus.PREPARING_FAILURE) {
             PipelineDataSourceWrapper targetDataSource = 
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());

Reply via email to